From 956caa12dbcc1dcb3de32075144d504b2bbbc015 Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 30 Jun 2025 17:23:56 +1000 Subject: [PATCH 1/9] raft: Create NewPartitionKey constructor A subsequent commit will consolidate the PartitionKey composite key into a single SHA256 hash value. In preparation for this, let's introduce and use a constructor function for the PartitionKey type. This will allow us to easily transition to the new way of creating keys. --- .../gitaly/service/raft/send_message_test.go | 69 +++-------------- .../gitaly/service/raft/send_snapshot_test.go | 75 ++----------------- .../gitaly/storage/raftmgr/grpc_transport.go | 7 +- .../storage/raftmgr/grpc_transport_test.go | 15 +--- .../storage/raftmgr/raft_enabled_storage.go | 11 +-- internal/gitaly/storage/raftmgr/replica.go | 24 +++--- .../gitaly/storage/raftmgr/replica_test.go | 20 +---- .../storage/raftmgr/routing_table_test.go | 60 +++------------ 8 files changed, 52 insertions(+), 229 deletions(-) diff --git a/internal/gitaly/service/raft/send_message_test.go b/internal/gitaly/service/raft/send_message_test.go index 50fe2f2a15b..016a3a5c8bb 100644 --- a/internal/gitaly/service/raft/send_message_test.go +++ b/internal/gitaly/service/raft/send_message_test.go @@ -56,10 +56,7 @@ func TestServer_SendMessage(t *testing.T) { registry := storage.(*raftmgr.RaftEnabledStorage).GetReplicaRegistry() replica := &mockRaftReplica{} - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - } + partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) // Register storage two @@ -83,11 +80,8 @@ func TestServer_SendMessage(t *testing.T) { req: &gitalypb.RaftMessageRequest{ ClusterId: "test-cluster", ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameOne, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -100,11 +94,8 @@ func TestServer_SendMessage(t *testing.T) { req: &gitalypb.RaftMessageRequest{ ClusterId: "test-cluster", ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameTwo, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameTwo, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -116,11 +107,8 @@ func TestServer_SendMessage(t *testing.T) { desc: "missing cluster ID", req: &gitalypb.RaftMessageRequest{ ReplicaId: &gitalypb.ReplicaID{ - StorageName: "storage-name", - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: "storage-name", + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -135,11 +123,8 @@ func TestServer_SendMessage(t *testing.T) { req: &gitalypb.RaftMessageRequest{ ClusterId: "wrong-cluster", ReplicaId: &gitalypb.ReplicaID{ - StorageName: "storage-name", - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: "storage-name", + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -149,42 +134,6 @@ func TestServer_SendMessage(t *testing.T) { expectedGrpcErr: codes.PermissionDenied, expectedError: `rpc error: code = PermissionDenied desc = message from wrong cluster: got "wrong-cluster", want "test-cluster"`, }, - { - desc: "missing authority name", - req: &gitalypb.RaftMessageRequest{ - ClusterId: "test-cluster", - ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - PartitionId: 1, - }, - }, - Message: &raftpb.Message{ - Type: raftpb.MsgApp, - To: 2, - }, - }, - expectedGrpcErr: codes.InvalidArgument, - expectedError: "rpc error: code = InvalidArgument desc = authority_name is required", - }, - { - desc: "missing partition ID", - req: &gitalypb.RaftMessageRequest{ - ClusterId: "test-cluster", - ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - }, - }, - Message: &raftpb.Message{ - Type: raftpb.MsgApp, - To: 2, - }, - }, - expectedGrpcErr: codes.InvalidArgument, - expectedError: "rpc error: code = InvalidArgument desc = partition_id is required", - }, } for _, tc := range testCases { diff --git a/internal/gitaly/service/raft/send_snapshot_test.go b/internal/gitaly/service/raft/send_snapshot_test.go index d63a8454792..d1f46b90bb0 100644 --- a/internal/gitaly/service/raft/send_snapshot_test.go +++ b/internal/gitaly/service/raft/send_snapshot_test.go @@ -57,10 +57,7 @@ func TestServer_SendSnapshot_Success(t *testing.T) { registry := storage.(*raftmgr.RaftEnabledStorage).GetReplicaRegistry() replica := &mockRaftReplica{} - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - } + partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) client := runRaftServer(t, ctx, cfg, mockNode) @@ -75,11 +72,8 @@ func TestServer_SendSnapshot_Success(t *testing.T) { RaftMsg: &gitalypb.RaftMessageRequest{ ClusterId: clusterID, ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameOne, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -130,11 +124,8 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { RaftSnapshotPayload: &gitalypb.RaftSnapshotMessageRequest_RaftMsg{ RaftMsg: &gitalypb.RaftMessageRequest{ ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameOne, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -153,11 +144,8 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { RaftMsg: &gitalypb.RaftMessageRequest{ ClusterId: "wrong-cluster", ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameOne, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -169,50 +157,6 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { expectedGrpcErr: codes.PermissionDenied, expectedError: `rpc error: code = PermissionDenied desc = message from wrong cluster: got "wrong-cluster", want "test-cluster"`, }, - { - desc: "missing authority name", - req: &gitalypb.RaftSnapshotMessageRequest{ - RaftSnapshotPayload: &gitalypb.RaftSnapshotMessageRequest_RaftMsg{ - RaftMsg: &gitalypb.RaftMessageRequest{ - ClusterId: clusterID, - ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - PartitionId: 1, - }, - }, - Message: &raftpb.Message{ - Type: raftpb.MsgApp, - To: 2, - }, - }, - }, - }, - expectedGrpcErr: codes.InvalidArgument, - expectedError: "rpc error: code = InvalidArgument desc = authority_name is required", - }, - { - desc: "missing partition ID", - req: &gitalypb.RaftSnapshotMessageRequest{ - RaftSnapshotPayload: &gitalypb.RaftSnapshotMessageRequest_RaftMsg{ - RaftMsg: &gitalypb.RaftMessageRequest{ - ClusterId: clusterID, - ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - }, - }, - Message: &raftpb.Message{ - Type: raftpb.MsgApp, - To: 2, - }, - }, - }, - }, - expectedGrpcErr: codes.InvalidArgument, - expectedError: "rpc error: code = InvalidArgument desc = partition_id is required", - }, } for _, tc := range errorTestCases { @@ -254,10 +198,7 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { registry := storage.(*raftmgr.RaftEnabledStorage).GetReplicaRegistry() replica := &mockRaftReplica{} - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - } + partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) client := runRaftServer(t, ctx, cfg, mockNode) diff --git a/internal/gitaly/storage/raftmgr/grpc_transport.go b/internal/gitaly/storage/raftmgr/grpc_transport.go index 4a2d5c0c881..d2c9811aff8 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport.go @@ -325,11 +325,8 @@ func (t *GrpcTransport) SendSnapshot(ctx context.Context, pk *gitalypb.Partition RaftMsg: &gitalypb.RaftMessageRequest{ ClusterId: t.cfg.Raft.ClusterID, ReplicaId: &gitalypb.ReplicaID{ - StorageName: replica.GetStorageName(), - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: pk.GetAuthorityName(), - PartitionId: pk.GetPartitionId(), - }, + StorageName: replica.GetStorageName(), + PartitionKey: pk, }, Message: &message, }, diff --git a/internal/gitaly/storage/raftmgr/grpc_transport_test.go b/internal/gitaly/storage/raftmgr/grpc_transport_test.go index 67e796ab5b8..c39ad4921b5 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport_test.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport_test.go @@ -172,10 +172,7 @@ func TestGrpcTransport_SendAndReceive(t *testing.T) { require.NoError(t, leader.transport.(*GrpcTransport).connectionPool.Close()) }) - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(tc.partitionID), - AuthorityName: storageName, - } + partitionKey := NewPartitionKey(storageName, storage.PartitionID(tc.partitionID)) mgr, err := leader.managerRegistry.GetReplica(partitionKey) require.NoError(t, err) @@ -294,10 +291,7 @@ func setupCluster(t *testing.T, logger logger.LogrusLogger, numNodes int, partit // Create and set up replica replica := newReplica(config) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: storageName, - PartitionId: uint64(partitionID), - } + partitionKey := NewPartitionKey(storageName, storage.PartitionID(partitionID)) // Register the manager with the registry registries[i].RegisterReplica(partitionKey, replica) @@ -575,10 +569,7 @@ func TestGrpcTransport_SendSnapshot(t *testing.T) { }() } - err = leader.transport.SendSnapshot(ctx, &gitalypb.PartitionKey{ - AuthorityName: storageName, - PartitionId: 1, - }, msg, snapshot) + err = leader.transport.SendSnapshot(ctx, NewPartitionKey(storageName, 1), msg, snapshot) if tc.expectedError != "" { require.ErrorContains(t, err, tc.expectedError) } else { diff --git a/internal/gitaly/storage/raftmgr/raft_enabled_storage.go b/internal/gitaly/storage/raftmgr/raft_enabled_storage.go index bad08c6074a..384c83bba63 100644 --- a/internal/gitaly/storage/raftmgr/raft_enabled_storage.go +++ b/internal/gitaly/storage/raftmgr/raft_enabled_storage.go @@ -8,7 +8,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/log" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) // RaftEnabledStorage wraps a storage.Storage instance with Raft functionality @@ -37,10 +36,7 @@ func (s *RaftEnabledStorage) GetReplicaRegistry() ReplicaRegistry { // RegisterReplica registers a replica with this RaftEnabledStorage // This should be called after both the replica and RaftEnabledStorage are created func (s *RaftEnabledStorage) RegisterReplica(partitionID storage.PartitionID, replica *Replica) error { - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(partitionID), - AuthorityName: replica.authorityName, - } + partitionKey := NewPartitionKey(replica.authorityName, partitionID) s.replicaRegistry.RegisterReplica(partitionKey, replica) return nil @@ -49,10 +45,7 @@ func (s *RaftEnabledStorage) RegisterReplica(partitionID storage.PartitionID, re // DeregisterReplica removes a replica from this RaftEnabledStorage. // This should be called when the replica is closing. func (s *RaftEnabledStorage) DeregisterReplica(replica *Replica) { - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(replica.ptnID), - AuthorityName: replica.authorityName, - } + partitionKey := NewPartitionKey(replica.authorityName, replica.ptnID) s.replicaRegistry.DeregisterReplica(partitionKey) } diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index b577ca6440d..2b297fba639 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -861,10 +861,7 @@ func (replica *Replica) processConfChange(entry raftpb.Entry) error { return fmt.Errorf("saving config state: %w", err) } - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(replica.ptnID), - } + partitionKey := NewPartitionKey(replica.authorityName, replica.ptnID) routingTable := replica.raftEnabledStorage.GetRoutingTable() if routingTable == nil { @@ -894,10 +891,7 @@ func (replica *Replica) sendMessages(rd *raft.Ready) error { // techniques such as batching health checks and quiescing inactive groups. // // See https://gitlab.com/gitlab-org/gitaly/-/issues/6304 - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(replica.ptnID), - } + partitionKey := NewPartitionKey(replica.authorityName, replica.ptnID) transport := replica.raftEnabledStorage.GetTransport() if transport == nil { return fmt.Errorf("transport not found") @@ -1069,10 +1063,7 @@ func (replica *Replica) proposeMembershipChange( } func checkMemberID(replica *Replica, memberID uint64, routingTable RoutingTable) error { - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(replica.ptnID), - } + partitionKey := NewPartitionKey(replica.authorityName, replica.ptnID) _, err := routingTable.Translate(partitionKey, memberID) if err != nil { @@ -1081,4 +1072,13 @@ func checkMemberID(replica *Replica, memberID uint64, routingTable RoutingTable) return nil } +// NewPartitionKey creates a partition key for a newly-minted partition. A partition should only +// ever have a single PartitionKey, computed by the replica which first created the partition. +func NewPartitionKey(storageName string, partitionID storage.PartitionID) *gitalypb.PartitionKey { + return &gitalypb.PartitionKey{ + AuthorityName: storageName, + PartitionId: uint64(partitionID), + } +} + var _ = (storage.LogManager)(&Replica{}) // Ensure Replica implements LogManager interface diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index dd223561388..8ac7d45cfe1 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -1683,10 +1683,7 @@ func TestReplica_StorageConnection(t *testing.T) { registry := raftNodeStorage.GetReplicaRegistry() require.NotNil(t, registry) - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(partitionID), - AuthorityName: storageName, - } + partitionKey := NewPartitionKey(storageName, partitionID) registeredReplica, err := registry.GetReplica(partitionKey) require.NoError(t, err) @@ -1726,10 +1723,7 @@ func TestReplica_AddNode(t *testing.T) { replica, err := createRaftReplica(t, ctx, memberID, socketPath, raftCfg, partitionID, metrics, opts...) require.NoError(t, err) - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(partitionID), - AuthorityName: cfg.Storages[0].Name, - } + partitionKey := NewPartitionKey(cfg.Storages[0].Name, partitionID) registry.RegisterReplica(partitionKey, replica) transport.registry = registry @@ -1793,10 +1787,7 @@ func TestReplica_AddNode(t *testing.T) { require.NotNil(t, raftEnabledStorage, "storage should be a RaftEnabledStorage") routingTable := raftEnabledStorage.GetRoutingTable() - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(partitionID), - } + partitionKey := NewPartitionKey(replica.authorityName, partitionID) // Create second node replicaTwo, socketPathTwo, srvTwo := createTestNode(t, ctx, 3, partitionID, raftCfg, metrics) @@ -1880,10 +1871,7 @@ func TestReplica_AddNode(t *testing.T) { require.NotNil(t, raftEnabledStorage, "storage should be a RaftEnabledStorage") routingTable := raftEnabledStorage.GetRoutingTable() - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(partitionID), - } + partitionKey := NewPartitionKey(replica.authorityName, partitionID) var destinationAddresses []string var servers []*grpc.Server diff --git a/internal/gitaly/storage/raftmgr/routing_table_test.go b/internal/gitaly/storage/raftmgr/routing_table_test.go index 658030183ee..47eb74d660d 100644 --- a/internal/gitaly/storage/raftmgr/routing_table_test.go +++ b/internal/gitaly/storage/raftmgr/routing_table_test.go @@ -24,10 +24,7 @@ func TestPersistentRoutingTable(t *testing.T) { t.Run("add and translate member", func(t *testing.T) { memberID := 1 address := "localhost:1234" - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) entry := RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -54,10 +51,7 @@ func TestPersistentRoutingTable(t *testing.T) { }) t.Run("stale entry rejected", func(t *testing.T) { - key := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 2, - } + key := NewPartitionKey("test-authority", 2) entry1 := RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -83,10 +77,7 @@ func TestPersistentRoutingTable(t *testing.T) { }) t.Run("node not found", func(t *testing.T) { - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 3, - } + partitionKey := NewPartitionKey("test-authority", 3) memberID := 999 // Non-existent node @@ -116,10 +107,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) @@ -148,10 +136,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) initialEntry := &RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -206,10 +191,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddLearnerNode) @@ -236,10 +218,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) changes := NewReplicaConfChanges(1, 1, 0, 1, createMetadata("localhost:1234")) changes.AddChange(0, ConfChangeAddNode) @@ -261,10 +240,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) // First add a node changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) @@ -294,10 +270,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) // Add a node with ID 1 changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) @@ -327,10 +300,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) entry := &RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -368,10 +338,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) initialEntry := &RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -416,10 +383,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) initialEntry := &RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ -- GitLab From e395ea0e05a6ecd5785bc5ee66838ed46a880d70 Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 30 Jun 2025 17:38:04 +1000 Subject: [PATCH 2/9] raft: Use PartitionKey in Replica struct The authorityName and ptnID fields of the Replica struct should be represented as a single opaque key. Replace these fields with a new partitionKey field. --- .../storage/raftmgr/raft_enabled_storage.go | 6 +-- internal/gitaly/storage/raftmgr/replica.go | 45 ++++++++----------- .../gitaly/storage/raftmgr/replica_test.go | 6 +-- 3 files changed, 24 insertions(+), 33 deletions(-) diff --git a/internal/gitaly/storage/raftmgr/raft_enabled_storage.go b/internal/gitaly/storage/raftmgr/raft_enabled_storage.go index 384c83bba63..baced9ed82d 100644 --- a/internal/gitaly/storage/raftmgr/raft_enabled_storage.go +++ b/internal/gitaly/storage/raftmgr/raft_enabled_storage.go @@ -36,8 +36,7 @@ func (s *RaftEnabledStorage) GetReplicaRegistry() ReplicaRegistry { // RegisterReplica registers a replica with this RaftEnabledStorage // This should be called after both the replica and RaftEnabledStorage are created func (s *RaftEnabledStorage) RegisterReplica(partitionID storage.PartitionID, replica *Replica) error { - partitionKey := NewPartitionKey(replica.authorityName, partitionID) - s.replicaRegistry.RegisterReplica(partitionKey, replica) + s.replicaRegistry.RegisterReplica(replica.partitionKey, replica) return nil } @@ -45,8 +44,7 @@ func (s *RaftEnabledStorage) RegisterReplica(partitionID storage.PartitionID, re // DeregisterReplica removes a replica from this RaftEnabledStorage. // This should be called when the replica is closing. func (s *RaftEnabledStorage) DeregisterReplica(replica *Replica) { - partitionKey := NewPartitionKey(replica.authorityName, replica.ptnID) - s.replicaRegistry.DeregisterReplica(partitionKey) + s.replicaRegistry.DeregisterReplica(replica.partitionKey) } // Node adds Raft functionality to each storage diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index 2b297fba639..f35742b1717 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -150,21 +150,20 @@ type Replica struct { ctx context.Context // Context for controlling replica's lifecycle cancel context.CancelFunc - memberID uint64 // Member ID of the replica - authorityName string // Name of the storage this partition belongs to - ptnID storage.PartitionID // Unique identifier for the managed partition - node raft.Node // etcd/raft node representation - raftCfg config.Raft // etcd/raft configurations - options ReplicaOptions // Additional replica configuration - logger logging.Logger // Internal logging - logStore *ReplicaLogStore // Persistent storage for Raft logs and state - registry *ReplicaEventRegistry // Event tracking - leadership *ReplicaLeadership // Current leadership information - syncer safe.Syncer // Synchronization operations - wg sync.WaitGroup // Goroutine lifecycle management - ready *ready // Initialization state tracking - started bool // Indicates if replica has been started - metrics RaftMetrics // Scoped metrics for this replica + memberID uint64 // Member ID of the replica + partitionKey *gitalypb.PartitionKey + node raft.Node // etcd/raft node representation + raftCfg config.Raft // etcd/raft configurations + options ReplicaOptions // Additional replica configuration + logger logging.Logger // Internal logging + logStore *ReplicaLogStore // Persistent storage for Raft logs and state + registry *ReplicaEventRegistry // Event tracking + leadership *ReplicaLeadership // Current leadership information + syncer safe.Syncer // Synchronization operations + wg sync.WaitGroup // Goroutine lifecycle management + ready *ready // Initialization state tracking + started bool // Indicates if replica has been started + metrics RaftMetrics // Scoped metrics for this replica // Reference to the RaftEnabledStorage that contains this replica raftEnabledStorage *RaftEnabledStorage @@ -286,8 +285,7 @@ func NewReplica( return &Replica{ memberID: memberID, - authorityName: authorityName, - ptnID: partitionID, + partitionKey: NewPartitionKey(authorityName, partitionID), raftCfg: raftCfg, options: options, logStore: logStore, @@ -323,7 +321,7 @@ func (replica *Replica) Initialize(ctx context.Context, appliedLSN storage.LSN) defer replica.mutex.Unlock() if replica.started { - return fmt.Errorf("raft replica for partition %q already started", replica.ptnID) + return fmt.Errorf("raft replica %q already started", replica.partitionKey.String()) } replica.started = true @@ -861,15 +859,13 @@ func (replica *Replica) processConfChange(entry raftpb.Entry) error { return fmt.Errorf("saving config state: %w", err) } - partitionKey := NewPartitionKey(replica.authorityName, replica.ptnID) - routingTable := replica.raftEnabledStorage.GetRoutingTable() if routingTable == nil { return fmt.Errorf("routing table not found") } // Apply the changes to the routing table - if err := routingTable.ApplyReplicaConfChange(partitionKey, replicaChanges); err != nil { + if err := routingTable.ApplyReplicaConfChange(replica.partitionKey, replicaChanges); err != nil { return fmt.Errorf("applying conf changes: %w", err) } @@ -891,12 +887,11 @@ func (replica *Replica) sendMessages(rd *raft.Ready) error { // techniques such as batching health checks and quiescing inactive groups. // // See https://gitlab.com/gitlab-org/gitaly/-/issues/6304 - partitionKey := NewPartitionKey(replica.authorityName, replica.ptnID) transport := replica.raftEnabledStorage.GetTransport() if transport == nil { return fmt.Errorf("transport not found") } - err := transport.Send(replica.ctx, replica, partitionKey, rd.Messages) + err := transport.Send(replica.ctx, replica, replica.partitionKey, rd.Messages) if err != nil { return err } @@ -1063,9 +1058,7 @@ func (replica *Replica) proposeMembershipChange( } func checkMemberID(replica *Replica, memberID uint64, routingTable RoutingTable) error { - partitionKey := NewPartitionKey(replica.authorityName, replica.ptnID) - - _, err := routingTable.Translate(partitionKey, memberID) + _, err := routingTable.Translate(replica.partitionKey, memberID) if err != nil { return fmt.Errorf("translating member ID: %w", err) } diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index 8ac7d45cfe1..b0993d6e05a 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -156,7 +156,7 @@ func TestReplica_Initialize(t *testing.T) { // Second initialization should fail err = mgr.Initialize(ctx, 0) - require.EqualError(t, err, fmt.Sprintf("raft replica for partition %q already started", partitionID)) + require.EqualError(t, err, fmt.Sprintf("raft replica %q already started", mgr.partitionKey.String())) require.NoError(t, mgr.Close()) }) @@ -1787,7 +1787,7 @@ func TestReplica_AddNode(t *testing.T) { require.NotNil(t, raftEnabledStorage, "storage should be a RaftEnabledStorage") routingTable := raftEnabledStorage.GetRoutingTable() - partitionKey := NewPartitionKey(replica.authorityName, partitionID) + partitionKey := replica.partitionKey // Create second node replicaTwo, socketPathTwo, srvTwo := createTestNode(t, ctx, 3, partitionID, raftCfg, metrics) @@ -1871,7 +1871,7 @@ func TestReplica_AddNode(t *testing.T) { require.NotNil(t, raftEnabledStorage, "storage should be a RaftEnabledStorage") routingTable := raftEnabledStorage.GetRoutingTable() - partitionKey := NewPartitionKey(replica.authorityName, partitionID) + partitionKey := replica.partitionKey var destinationAddresses []string var servers []*grpc.Server -- GitLab From 40a496229f32d2c5310ccbd66bc3d0cc11641a18 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 15 Jul 2025 17:15:09 +1000 Subject: [PATCH 3/9] raft: Use PartitionKey in Replica constructors The RaftReplicaFactory function and the NewReplica constructor currently take the storageName and partitionID as arguments. The storageName will continue serving a purpose as the name of the storage on the newly-added node where we want the replicated partition to reside. The partitionID however will no longer be needed. When a partition is replicated to a new node, the new node's partition manager will take care of assigning a new partition ID. We threaded the ID as an argument previously as a way to compose the PartitionKey, so this can now be removed. Replace partitionID with PartitionKey, and add some TODO breadcrumbs for important follow-ups. --- .../storage/raftmgr/raft_enabled_storage.go | 2 +- internal/gitaly/storage/raftmgr/replica.go | 24 +++++++++---------- .../gitaly/storage/raftmgr/replica_test.go | 20 ++++++++-------- .../gitaly/storage/raftmgr/testhelper_test.go | 2 +- .../storage/storagemgr/partition/factory.go | 9 ++++++- 5 files changed, 31 insertions(+), 26 deletions(-) diff --git a/internal/gitaly/storage/raftmgr/raft_enabled_storage.go b/internal/gitaly/storage/raftmgr/raft_enabled_storage.go index baced9ed82d..882935924df 100644 --- a/internal/gitaly/storage/raftmgr/raft_enabled_storage.go +++ b/internal/gitaly/storage/raftmgr/raft_enabled_storage.go @@ -35,7 +35,7 @@ func (s *RaftEnabledStorage) GetReplicaRegistry() ReplicaRegistry { // RegisterReplica registers a replica with this RaftEnabledStorage // This should be called after both the replica and RaftEnabledStorage are created -func (s *RaftEnabledStorage) RegisterReplica(partitionID storage.PartitionID, replica *Replica) error { +func (s *RaftEnabledStorage) RegisterReplica(replica *Replica) error { s.replicaRegistry.RegisterReplica(replica.partitionKey, replica) return nil diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index f35742b1717..c81fe11f76c 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -206,7 +206,7 @@ func applyOptions(raftCfg config.Raft, opts []OptionFunc) (ReplicaOptions, error type RaftReplicaFactory func( memberID uint64, storageName string, - partitionID storage.PartitionID, + partitionKey *gitalypb.PartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, @@ -218,7 +218,7 @@ func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionF return func( memberID uint64, storageName string, - partitionID storage.PartitionID, + partitionKey *gitalypb.PartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, @@ -233,14 +233,14 @@ func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionF return nil, fmt.Errorf("storage %q is not a RaftEnabledStorage", storageName) } - replica, err := NewReplica(memberID, storageName, partitionID, raftCfg, logStore, raftEnabledStorage, logger, metrics, opts...) + replica, err := NewReplica(memberID, partitionKey, raftCfg, logStore, raftEnabledStorage, logger, metrics, opts...) if err != nil { return nil, fmt.Errorf("create replica %q: %w", storageName, err) } - if err := raftEnabledStorage.RegisterReplica(partitionID, replica); err != nil { - return nil, fmt.Errorf("register replica for partition %d in storage %q: %w", - partitionID, storageName, err) + if err := raftEnabledStorage.RegisterReplica(replica); err != nil { + return nil, fmt.Errorf("register replica %q in storage %q: %w", + partitionKey.String(), storageName, err) } return replica, nil @@ -253,8 +253,7 @@ func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionF // the Raft protocol operation. func NewReplica( memberID uint64, - authorityName string, - partitionID storage.PartitionID, + partitionKey *gitalypb.PartitionKey, raftCfg config.Raft, logStore *ReplicaLogStore, raftEnabledStorage *RaftEnabledStorage, @@ -276,16 +275,15 @@ func NewReplica( } logger = logger.WithFields(logging.Fields{ - "component": "raft", - "raft.authority": authorityName, - "raft.partition": partitionID, + "component": "raft", + "raft.partitionKey": partitionKey.String(), }) - scopedMetrics := metrics.Scope(authorityName) + scopedMetrics := metrics.Scope(partitionKey.AuthorityName) return &Replica{ memberID: memberID, - partitionKey: NewPartitionKey(authorityName, partitionID), + partitionKey: partitionKey, raftCfg: raftCfg, options: options, logStore: logStore, diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index b0993d6e05a..ea66555e788 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -239,7 +239,7 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr, err := raftFactory(1, storageName, partitionID, logStore, logger, metrics) + mgr, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, mgr.Close()) }() @@ -317,7 +317,7 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr, err := raftFactory(1, storageName, partitionID, logStore, logger, metrics) + mgr, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, mgr.Close()) }() @@ -362,7 +362,7 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr1, err := raftFactory(1, storageName, partitionID, logStore1, logger, metrics) + mgr1, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore1, logger, metrics) require.NoError(t, err) // Initialize the raft replica @@ -424,7 +424,7 @@ func TestReplica_Initialize(t *testing.T) { logStore2, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) - mgr2, err := raftFactory(1, storageName, partitionID, logStore2, logger, metrics) + mgr2, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore2, logger, metrics) require.NoError(t, err) // Re-initialize Raft with the highest LSN @@ -711,7 +711,7 @@ func TestReplica_AppendLogEntry(t *testing.T) { mgr, err := raftFactory( 1, storageName, - partitionID, + NewPartitionKey(storageName, partitionID), logStore, logger, metrics, @@ -829,7 +829,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) // Configure replica - mgr, err := raftFactory(1, storageName, partitionID, logStore, logger, metrics) + mgr, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) require.NoError(t, err) for _, f := range setupFuncs { @@ -882,7 +882,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(env.recorder)) - recoveryMgr, err := raftFactory(1, env.storageName, env.partitionID, logStore, logger, env.metrics) + recoveryMgr, err := raftFactory(1, env.storageName, NewPartitionKey(env.storageName, env.partitionID), logStore, logger, env.metrics) require.NoError(t, err) // Initialize with the last known LSN @@ -1663,7 +1663,7 @@ func TestReplica_StorageConnection(t *testing.T) { // Create factory that connects replicas to storage raftFactory := DefaultFactoryWithNode(cfg.Raft, raftNode) - replica, err := raftFactory(1, storageName, partitionID, logStore, logger, NewMetrics()) + replica, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, replica.Close()) }) @@ -1694,14 +1694,14 @@ func TestReplica_StorageConnection(t *testing.T) { }) t.Run("multiple replicas for same partition key", func(t *testing.T) { - duplicateReplica, err := raftFactory(1, storageName, partitionID, logStore, logger, NewMetrics()) + duplicateReplica, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, duplicateReplica) }) t.Run("Register different replicas for different partition keys", func(t *testing.T) { partitionID := storage.PartitionID(2) - replicaTwo, err := raftFactory(1, storageName, partitionID, logStore, logger, NewMetrics()) + replicaTwo, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, replicaTwo) }) diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index 8d28357ae35..37a513c534c 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -141,7 +141,7 @@ func createRaftReplicaWithConfig(t *testing.T, ctx context.Context, raftCfg conf } raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, config.Options...) - return raftFactory(config.MemberID, storageName, config.PartitionID, logStore, logger, metrics) + return raftFactory(config.MemberID, storageName, NewPartitionKey(storageName, config.PartitionID), logStore, logger, metrics) } func createTempServer(t *testing.T, transport *GrpcTransport) (string, *grpc.Server) { diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index 6adca834a2f..c5ac7db2c2c 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -64,6 +64,13 @@ func (f Factory) New( if f.raftCfg.Enabled { factory := f.raftFactory + // TODO: The PartitionKey will be retrieved from the incoming context when a Raft + // replica is booted up. This allows us to maintain a persistent key across + // all replicas of the partition. + // storageName will then represent the target storage where we intend on + // placing the new replicated partition. + partitionKey := raftmgr.NewPartitionKey(storageName, partitionID) + absoluteStateDir = getRaftPartitionPath(storageName, partitionID, absoluteStateDir) replicaLogStore, err := raftmgr.NewReplicaLogStore( @@ -84,7 +91,7 @@ func (f Factory) New( raftReplica, err := factory( 1, storageName, - partitionID, + partitionKey, replicaLogStore, logger, f.partitionMetrics.raft, -- GitLab From ad3fdc360a705163accc31fcc24b7957dac57ce8 Mon Sep 17 00:00:00 2001 From: James Liu Date: Wed, 2 Jul 2025 12:54:32 +1000 Subject: [PATCH 4/9] fix: Use binary partition ID consistently The NewKVRoutingTable() constructor creates a prefixed Badger database within the dedicated metadata partition, with partition ID = 1. The partition is reserved for Raft use, so normal repository partitions begin at ID = 2. Partition prefixes in Badger are of the form p//... where partition_id is assumed to be encoded as binary with a fixed length. Additional keys relative to the partition can be nested underneath the prefix (the ... portion). The partition manager's ListPartitions() function uses a partitionIterator to attempt to retrieve a list of active partitions based on the available keys in Badger. Since there can be many keys that all begin with the same p/ prefix, the extractPartitionID() function [1] is responsible for extracting only the portion of the key. It does this by assuming the length of the binary-encoded and unmarshals it into an integer. Because the KVRoutingTable doesn't encode the partition ID as binary, the extractPartitionID() function will unmarshal past the end of the partition ID part of the key, returning a nonexistent partition ID. This also has the side effect of breaking one of the partition backup tests [2] as it changes the number of expected partitions. Currently, multiple instances of the invalid prefix would unmarshal to the same invalid partition ID due to the beginning of the keys being the same. For example: - p/1/raft/default/abc and - p/1/raft/default/def would produce the same invalid partition ID because: - We start with the key p/1/raft/default/2 - p/ is stripped out, leaving 1/raft/default/2 - We take the first 8 bytes of the key, as we assume the PartitionID's 64-bit integer occupies the entire space, leaving 1/raft/d. - 1/raft/d is unmarshaled into an invalid partition ID of 3544177194420154212. When we transition to using a SHA256 hash as the partition ID, the beginning of the hash would be different, causing an additional partition to be detected incorrectly and returned. Fix this by exporting the storagemgr.KeyPrefixPartition() function and using it in the NewKVRoutingTable constructor. [1] https://gitlab.com/gitlab-org/gitaly/-/blob/9a72666ea3a0a18d2f02d2821b880daee41d9767/internal/gitaly/storage/storagemgr/partition_manager.go#L763-777 [2] https://gitlab.com/gitlab-org/gitaly/blob/b52f1ecc47e28d380c62a01deedcff7abedc86fd/internal/backup/partition_backup_test.go#L46-50 --- internal/gitaly/storage/raftmgr/routing_table.go | 4 ++-- .../gitaly/storage/storagemgr/partition/testhelper_test.go | 4 +++- internal/gitaly/storage/storagemgr/partition_manager.go | 7 ++++--- .../gitaly/storage/storagemgr/partition_manager_test.go | 4 ++-- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/gitaly/storage/raftmgr/routing_table.go b/internal/gitaly/storage/raftmgr/routing_table.go index 00628c9c60a..da214a218c0 100644 --- a/internal/gitaly/storage/raftmgr/routing_table.go +++ b/internal/gitaly/storage/raftmgr/routing_table.go @@ -14,7 +14,7 @@ import ( ) func routingKey(partitionKey *gitalypb.PartitionKey) []byte { - return []byte(fmt.Sprintf("/raft/%s/%d", partitionKey.GetAuthorityName(), partitionKey.GetPartitionId())) + return []byte(fmt.Sprintf("raft/%s/%d", partitionKey.GetAuthorityName(), partitionKey.GetPartitionId())) } // RoutingTableEntry represents a Raft cluster's routing state for a partition. @@ -50,7 +50,7 @@ type kvRoutingTable struct { // NewKVRoutingTable creates a new key-value based routing table implementation // that persists routing information using badgerDB. func NewKVRoutingTable(kvStore keyvalue.Store) *kvRoutingTable { - prefix := []byte(fmt.Sprintf("p/%d", storagemgr.MetadataPartitionID)) + prefix := storagemgr.KeyPrefixPartition(storagemgr.MetadataPartitionID) prefixedStore := keyvalue.NewPrefixedTransactioner(kvStore, prefix) return &kvRoutingTable{ kvStore: prefixedStore, diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index b4272213378..bca19504dea 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -43,6 +43,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" logrus "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -1733,7 +1734,8 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas for _, key := range raftmgr.RaftDBKeys { tc.expectedState.Database[string(key)] = &anypb.Any{} } - peerKey := fmt.Sprintf("p/1/raft/%s/%d", storageName, setup.PartitionID) + partitionPrefix := storagemgr.KeyPrefixPartition(storagemgr.MetadataPartitionID) + peerKey := fmt.Sprintf("%sraft/%s/%s", partitionPrefix, storageName, setup.PartitionID) if _, ok := tc.expectedState.Database[peerKey]; !ok { tc.expectedState.Database[peerKey] = &anypb.Any{} } diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index b0d0e1dbf86..0f6c6a9ad5e 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -302,7 +302,8 @@ func clearStagingDirectory(stagingDir string) error { return nil } -func keyPrefixPartition(ptnID storage.PartitionID) []byte { +// KeyPrefixPartition returns the prefix to be used for KV-store entries scoped to ptnID. +func KeyPrefixPartition(ptnID storage.PartitionID) []byte { return []byte(fmt.Sprintf("%s%s/", PrefixPartition, ptnID.MarshalBinary())) } @@ -540,7 +541,7 @@ func (sm *StorageManager) startPartition(ctx context.Context, partitionID storag mgr := sm.partitionFactory.New( logger, partitionID, - keyvalue.NewPrefixedTransactioner(sm.database, keyPrefixPartition(partitionID)), + keyvalue.NewPrefixedTransactioner(sm.database, KeyPrefixPartition(partitionID)), sm.name, sm.path, absoluteStateDir, @@ -688,7 +689,7 @@ func (sm *StorageManager) ListPartitions(partitionID storage.PartitionID) (stora pi := &partitionIterator{ txn: txn, it: iter, - seek: keyPrefixPartition(partitionID), + seek: KeyPrefixPartition(partitionID), } return pi, nil diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go index 489443ba06b..c7afae49aea 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go @@ -1513,8 +1513,8 @@ func TestStorageManager_ListPartitions(t *testing.T) { // Creating multiple partition keys with duplicates require.NoError(t, storageMgr.database.Update(func(tx keyvalue.ReadWriter) error { for i := initialPartitionID; i < 4; i++ { - require.NoError(t, tx.Set(append(keyPrefixPartition(storage.PartitionID(i)), []byte("key1")...), nil)) - require.NoError(t, tx.Set(append(keyPrefixPartition(storage.PartitionID(i)), []byte("key2")...), nil)) + require.NoError(t, tx.Set(append(KeyPrefixPartition(storage.PartitionID(i)), []byte("key1")...), nil)) + require.NoError(t, tx.Set(append(KeyPrefixPartition(storage.PartitionID(i)), []byte("key2")...), nil)) } return nil })) -- GitLab From e1e1db1ab031283ccfafd72665bd0ff54ab6e0b1 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 1 Jul 2025 14:28:24 +1000 Subject: [PATCH 5/9] raft: Make PartitionKey opaque The composite PartitionKey of (storage_name, partition_id) didn't make sense once the partition crossed the node boundary and caused a bit of confusion. The intention of the key is to remain immutable after being minted by the node that first created the partition. Other nodes that are replicating the partition were expected to use the same PartitionKey for lookups, but maintain their own internal storage and partition ID mapping. Make the key opaque so there's a clear distinction between the immutable key minted by the authority node, and the storage and partition ID attributes used by the local nodes to store the replicated partition. Several changes were also required: Modify the RoutingTable's ApplyReplicaConfChange() function to take an explicit storageName parameter, representing the name of the storage that will be storing the replicated partition. The routingKey is also modified to use the partitionKey directly. Modify the SendMessage handler to validate only that the value of the key exists. Modify the filename generated by the SendSnapshot RPC to use the full PartitionKey value. --- internal/gitaly/service/raft/send_message.go | 9 +- internal/gitaly/service/raft/send_snapshot.go | 2 +- .../gitaly/service/raft/send_snapshot_test.go | 3 +- internal/gitaly/storage/raftmgr/replica.go | 14 +- .../storage/raftmgr/replica_registry.go | 11 +- .../gitaly/storage/raftmgr/replica_test.go | 2 +- .../gitaly/storage/raftmgr/routing_table.go | 11 +- .../storage/raftmgr/routing_table_test.go | 22 +-- .../storagemgr/partition/testhelper_test.go | 2 +- proto/cluster.proto | 17 +- proto/go/gitalypb/cluster.pb.go | 166 ++++++++---------- 11 files changed, 118 insertions(+), 141 deletions(-) diff --git a/internal/gitaly/service/raft/send_message.go b/internal/gitaly/service/raft/send_message.go index 0ef4c500681..a7ab1c5250f 100644 --- a/internal/gitaly/service/raft/send_message.go +++ b/internal/gitaly/service/raft/send_message.go @@ -57,8 +57,6 @@ func (s *Server) SendMessage(stream gitalypb.RaftService_SendMessageServer) erro func extractRaftMessageReq(req *gitalypb.RaftMessageRequest, s *Server) (*gitalypb.ReplicaID, *gitalypb.PartitionKey, error) { replicaID := req.GetReplicaId() partitionKey := replicaID.GetPartitionKey() - authorityName := partitionKey.GetAuthorityName() - partitionID := partitionKey.GetPartitionId() // The cluster ID protects Gitaly from cross-cluster interactions, which could potentially corrupt the clusters. // This is particularly crucial after disaster recovery so that an identical cluster is restored from backup. @@ -72,11 +70,8 @@ func extractRaftMessageReq(req *gitalypb.RaftMessageRequest, s *Server) (*gitaly req.GetClusterId(), s.cfg.Raft.ClusterID) } - if authorityName == "" { - return nil, nil, structerr.NewInvalidArgument("authority_name is required") - } - if partitionID == 0 { - return nil, nil, structerr.NewInvalidArgument("partition_id is required") + if partitionKey.GetValue() == "" { + return nil, nil, structerr.NewInvalidArgument("partition_key cannot be empty") } return replicaID, partitionKey, nil } diff --git a/internal/gitaly/service/raft/send_snapshot.go b/internal/gitaly/service/raft/send_snapshot.go index e0925e90ed2..5d29415fefc 100644 --- a/internal/gitaly/service/raft/send_snapshot.go +++ b/internal/gitaly/service/raft/send_snapshot.go @@ -27,7 +27,7 @@ func (s *Server) SendSnapshot(stream gitalypb.RaftService_SendSnapshotServer) (r return err } - fname := fmt.Sprintf("%016d-%016d-%016d%s", partitionKey.GetPartitionId(), raftMsg.GetMessage().Term, raftMsg.GetMessage().Index, ".snap") + fname := fmt.Sprintf("%s-%016d-%016d%s", partitionKey.GetValue(), raftMsg.GetMessage().Term, raftMsg.GetMessage().Index, ".snap") snapshotPath := filepath.Join(s.cfg.Raft.SnapshotDir, fname) snapshotFile, err := os.Create(snapshotPath) if err != nil { diff --git a/internal/gitaly/service/raft/send_snapshot_test.go b/internal/gitaly/service/raft/send_snapshot_test.go index d1f46b90bb0..d52ff9dc78b 100644 --- a/internal/gitaly/service/raft/send_snapshot_test.go +++ b/internal/gitaly/service/raft/send_snapshot_test.go @@ -1,6 +1,7 @@ package raft import ( + "fmt" "os" "path/filepath" "testing" @@ -104,7 +105,7 @@ func TestServer_SendSnapshot_Success(t *testing.T) { testhelper.RequireDirectoryState(t, cfg.Raft.SnapshotDir, "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, - "/0000000000000001-0000000000000002-0000000000000003.snap": {Mode: os.FileMode(0o644), Content: data}, + fmt.Sprintf("/%s-0000000000000002-0000000000000003.snap", partitionKey.GetValue()): {Mode: os.FileMode(0o644), Content: data}, }) } diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index c81fe11f76c..9328c1abbf8 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -2,6 +2,7 @@ package raftmgr import ( "context" + "crypto/sha256" "errors" "fmt" "runtime" @@ -240,7 +241,7 @@ func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionF if err := raftEnabledStorage.RegisterReplica(replica); err != nil { return nil, fmt.Errorf("register replica %q in storage %q: %w", - partitionKey.String(), storageName, err) + partitionKey.GetValue(), storageName, err) } return replica, nil @@ -276,10 +277,10 @@ func NewReplica( logger = logger.WithFields(logging.Fields{ "component": "raft", - "raft.partitionKey": partitionKey.String(), + "raft.partitionKey": partitionKey.GetValue(), }) - scopedMetrics := metrics.Scope(partitionKey.AuthorityName) + scopedMetrics := metrics.Scope(logStore.authorityName) return &Replica{ memberID: memberID, @@ -319,7 +320,7 @@ func (replica *Replica) Initialize(ctx context.Context, appliedLSN storage.LSN) defer replica.mutex.Unlock() if replica.started { - return fmt.Errorf("raft replica %q already started", replica.partitionKey.String()) + return fmt.Errorf("raft replica %q already started", replica.partitionKey.GetValue()) } replica.started = true @@ -863,7 +864,7 @@ func (replica *Replica) processConfChange(entry raftpb.Entry) error { } // Apply the changes to the routing table - if err := routingTable.ApplyReplicaConfChange(replica.partitionKey, replicaChanges); err != nil { + if err := routingTable.ApplyReplicaConfChange(replica.logStore.authorityName, replica.partitionKey, replicaChanges); err != nil { return fmt.Errorf("applying conf changes: %w", err) } @@ -1067,8 +1068,7 @@ func checkMemberID(replica *Replica, memberID uint64, routingTable RoutingTable) // ever have a single PartitionKey, computed by the replica which first created the partition. func NewPartitionKey(storageName string, partitionID storage.PartitionID) *gitalypb.PartitionKey { return &gitalypb.PartitionKey{ - AuthorityName: storageName, - PartitionId: uint64(partitionID), + Value: fmt.Sprintf("%x", sha256.Sum256([]byte(storageName+partitionID.String()))), } } diff --git a/internal/gitaly/storage/raftmgr/replica_registry.go b/internal/gitaly/storage/raftmgr/replica_registry.go index 4a519944dcf..ee6ed2da86b 100644 --- a/internal/gitaly/storage/raftmgr/replica_registry.go +++ b/internal/gitaly/storage/raftmgr/replica_registry.go @@ -1,7 +1,6 @@ package raftmgr import ( - "fmt" "sync" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -10,10 +9,6 @@ import ( var errNoReplicaFound = structerr.NewNotFound("no replica found") -func partitionKeyToString(pk *gitalypb.PartitionKey) string { - return fmt.Sprintf("%d:%s", pk.GetPartitionId(), pk.GetAuthorityName()) -} - // ReplicaRegistry is an interface that defines the methods to register and retrieve replicas. type ReplicaRegistry interface { // GetReplica returns the replica for a given partition key. @@ -36,7 +31,7 @@ func NewReplicaRegistry() *raftRegistry { // GetReplica returns the replica for a given partitionKey. func (r *raftRegistry) GetReplica(key *gitalypb.PartitionKey) (RaftReplica, error) { - if mgr, ok := r.replicas.Load(partitionKeyToString(key)); ok { + if mgr, ok := r.replicas.Load(key.GetValue()); ok { return mgr.(RaftReplica), nil } return nil, errNoReplicaFound.WithMetadata("partition_key", key) @@ -44,10 +39,10 @@ func (r *raftRegistry) GetReplica(key *gitalypb.PartitionKey) (RaftReplica, erro // RegisterReplica registers a replica for a given partitionKey. func (r *raftRegistry) RegisterReplica(key *gitalypb.PartitionKey, replica RaftReplica) { - r.replicas.LoadOrStore(partitionKeyToString(key), replica) + r.replicas.LoadOrStore(key.GetValue(), replica) } // DeregisterReplica removes the replica with the given key from the registry. func (r *raftRegistry) DeregisterReplica(key *gitalypb.PartitionKey) { - r.replicas.Delete(partitionKeyToString(key)) + r.replicas.Delete(key.GetValue()) } diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index ea66555e788..3b8b7601edb 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -156,7 +156,7 @@ func TestReplica_Initialize(t *testing.T) { // Second initialization should fail err = mgr.Initialize(ctx, 0) - require.EqualError(t, err, fmt.Sprintf("raft replica %q already started", mgr.partitionKey.String())) + require.EqualError(t, err, fmt.Sprintf("raft replica %q already started", mgr.partitionKey.GetValue())) require.NoError(t, mgr.Close()) }) diff --git a/internal/gitaly/storage/raftmgr/routing_table.go b/internal/gitaly/storage/raftmgr/routing_table.go index da214a218c0..29fc4cd3b55 100644 --- a/internal/gitaly/storage/raftmgr/routing_table.go +++ b/internal/gitaly/storage/raftmgr/routing_table.go @@ -14,7 +14,7 @@ import ( ) func routingKey(partitionKey *gitalypb.PartitionKey) []byte { - return []byte(fmt.Sprintf("raft/%s/%d", partitionKey.GetAuthorityName(), partitionKey.GetPartitionId())) + return []byte(fmt.Sprintf("raft/%s", partitionKey.GetValue())) } // RoutingTableEntry represents a Raft cluster's routing state for a partition. @@ -38,7 +38,7 @@ type RoutingTable interface { Translate(partitionKey *gitalypb.PartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) GetEntry(partitionKey *gitalypb.PartitionKey) (*RoutingTableEntry, error) UpsertEntry(entry RoutingTableEntry) error - ApplyReplicaConfChange(partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error + ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error } // PersistentRoutingTable implements the RoutingTable interface with KV storage @@ -144,7 +144,7 @@ func (r *kvRoutingTable) Translate(partitionKey *gitalypb.PartitionKey, memberID return nil, fmt.Errorf("no address found for memberID %d", memberID) } -func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error { +func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error { routingTableEntry, err := r.GetEntry(partitionKey) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return fmt.Errorf("getting routing table entry: %w", err) @@ -160,7 +160,6 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.Partition routingTableEntry.Term = changes.Term() routingTableEntry.Index = changes.Index() - authorityName := partitionKey.GetAuthorityName() metadata := changes.Metadata() for _, confChange := range changes.Changes() { @@ -179,7 +178,7 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.Partition replica := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, - StorageName: authorityName, + StorageName: storageName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_VOTER, } @@ -199,7 +198,7 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.Partition learner := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, - StorageName: authorityName, + StorageName: storageName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_LEARNER, } diff --git a/internal/gitaly/storage/raftmgr/routing_table_test.go b/internal/gitaly/storage/raftmgr/routing_table_test.go index 47eb74d660d..890c181cd8a 100644 --- a/internal/gitaly/storage/raftmgr/routing_table_test.go +++ b/internal/gitaly/storage/raftmgr/routing_table_test.go @@ -112,7 +112,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -166,7 +166,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(2, 2, 1, 1, nil) changes.AddChange(2, ConfChangeRemoveNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -196,7 +196,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddLearnerNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -223,7 +223,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(1, 1, 0, 1, createMetadata("localhost:1234")) changes.AddChange(0, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID should be non-zero") }) @@ -246,14 +246,14 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) // Try to add the same node ID again changes = NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID 1 already exists") }) @@ -276,14 +276,14 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) // Try to update a non-existent node with ID 2 changes = NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) changes.AddChange(2, ConfChangeUpdateNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID 2 not found for update") }) @@ -321,7 +321,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(entry.Term, entry.Index, 1, 1, nil) changes.AddChange(1, ConfChangeRemoveNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "no replicas to upsert") }) @@ -360,7 +360,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) changes.AddChange(1, ConfChangeUpdateNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -406,7 +406,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes.AddChange(1, ConfChangeRemoveNode) changes.AddChange(2, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index bca19504dea..b30556f88d6 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -1735,7 +1735,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas tc.expectedState.Database[string(key)] = &anypb.Any{} } partitionPrefix := storagemgr.KeyPrefixPartition(storagemgr.MetadataPartitionID) - peerKey := fmt.Sprintf("%sraft/%s/%s", partitionPrefix, storageName, setup.PartitionID) + peerKey := fmt.Sprintf("%sraft/%s", partitionPrefix, raftmgr.NewPartitionKey(storageName, setup.PartitionID).GetValue()) if _, ok := tc.expectedState.Database[peerKey]; !ok { tc.expectedState.Database[peerKey] = &anypb.Any{} } diff --git a/proto/cluster.proto b/proto/cluster.proto index 3bc4f75ca3d..17fde5ae856 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -36,19 +36,16 @@ message RaftEntry { LogData data = 2; } - -// PartitionKey identifies which partition this replica belongs to. +// PartitionKey is a globally-unique identifier for a replicated partition. +// The replica which minted the partition is responsible for computing the PartitionKey, +// which is a hash of the storage name and partition ID. The key is then consumed +// as-is by other replicas wishing to store the partition. message PartitionKey { - // authority_name represents the storage that created the partition. - string authority_name = 1 [(gitaly.storage) = true]; - // partition_id is the local incrementing ID of a specific partition within a - // storage. Together with `authority_name`, this forms a unique identifier for - // a partition across the cluster. A partition belongs to a Raft group. - uint64 partition_id = 2; + // value is the SHA256 digest of the storage name and partition ID of the + // newly-minted partition. + string value = 1; } - - // ReplicaID uniquely identifies a replica in the Raft cluster. // It combines partition information with node-specific details. message ReplicaID { diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index cdd0b21433b..650a04451b4 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -134,15 +134,15 @@ func (x *RaftEntry) GetData() *RaftEntry_LogData { return nil } -// PartitionKey identifies which partition this replica belongs to. +// PartitionKey is a globally-unique identifier for a replicated partition. +// The replica which minted the partition is responsible for computing the PartitionKey, +// which is a hash of the storage name and partition ID. The key is then consumed +// as-is by other replicas wishing to store the partition. type PartitionKey struct { state protoimpl.MessageState `protogen:"open.v1"` - // authority_name represents the storage that created the partition. - AuthorityName string `protobuf:"bytes,1,opt,name=authority_name,json=authorityName,proto3" json:"authority_name,omitempty"` - // partition_id is the local incrementing ID of a specific partition within a - // storage. Together with `authority_name`, this forms a unique identifier for - // a partition across the cluster. A partition belongs to a Raft group. - PartitionId uint64 `protobuf:"varint,2,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + // value is the SHA256 digest of the storage name and partition ID of the + // newly-minted partition. + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -177,20 +177,13 @@ func (*PartitionKey) Descriptor() ([]byte, []int) { return file_cluster_proto_rawDescGZIP(), []int{1} } -func (x *PartitionKey) GetAuthorityName() string { +func (x *PartitionKey) GetValue() string { if x != nil { - return x.AuthorityName + return x.Value } return "" } -func (x *PartitionKey) GetPartitionId() uint64 { - if x != nil { - return x.PartitionId - } - return 0 -} - // ReplicaID uniquely identifies a replica in the Raft cluster. // It combines partition information with node-specific details. type ReplicaID struct { @@ -643,78 +636,75 @@ var file_cluster_proto_rawDesc = string([]byte{ 0x0a, 0x0a, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, - 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x5e, 0x0a, 0x0c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x2b, 0x0a, 0x0e, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, - 0x74, 0x79, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x04, 0x88, - 0xc6, 0x2c, 0x01, 0x52, 0x0d, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x4e, 0x61, - 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, - 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0xf6, 0x02, 0x0a, 0x09, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x49, 0x44, 0x12, 0x39, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, - 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1b, - 0x0a, 0x09, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x08, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, - 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x36, - 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x49, 0x44, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, - 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, - 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, 0x24, 0x0a, 0x08, 0x4d, 0x65, 0x74, - 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, - 0x5d, 0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1c, - 0x0a, 0x18, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, - 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, - 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x56, 0x4f, 0x54, - 0x45, 0x52, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, - 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, 0x45, 0x52, 0x10, 0x02, 0x22, 0x90, - 0x01, 0x0a, 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, - 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, - 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x70, 0x62, - 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x22, 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, 0x01, 0x0a, 0x1a, 0x52, 0x61, 0x66, - 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x72, 0x61, 0x66, 0x74, 0x5f, - 0x6d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x61, 0x66, 0x74, 0x4d, 0x73, 0x67, - 0x12, 0x16, 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, - 0x00, 0x52, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, 0x0a, 0x15, 0x72, 0x61, 0x66, 0x74, - 0x5f, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, - 0x64, 0x22, 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, - 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x73, - 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x6e, 0x61, 0x70, 0x73, - 0x68, 0x6f, 0x74, 0x53, 0x69, 0x7a, 0x65, 0x32, 0xc6, 0x01, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x63, 0x0a, 0x0c, 0x53, - 0x65, 0x6e, 0x64, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x22, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, - 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, - 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, - 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, - 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x24, 0x0a, 0x0c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xf6, 0x02, 0x0a, 0x09, + 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x12, 0x39, 0x0a, 0x0d, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x14, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, + 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, + 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, 0x04, + 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, + 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x52, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, + 0x24, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x61, + 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, + 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x5d, 0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x1c, 0x0a, 0x18, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, + 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x56, 0x4f, 0x54, 0x45, 0x52, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, + 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, + 0x45, 0x52, 0x10, 0x02, 0x22, 0x90, 0x01, 0x0a, 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, + 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, + 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, + 0x72, 0x61, 0x66, 0x74, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, + 0x01, 0x0a, 0x1a, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, + 0x08, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, + 0x61, 0x66, 0x74, 0x4d, 0x73, 0x67, 0x12, 0x16, 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, + 0x0a, 0x15, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, + 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, + 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, + 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x0c, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x53, 0x69, 0x7a, 0x65, 0x32, 0xc6, 0x01, + 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, + 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, + 0x01, 0x12, 0x63, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, + 0x74, 0x12, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, + 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, + 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, + 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, }) var ( -- GitLab From c98680bacc2ab530b59b5501a800119a1c87451c Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 1 Jul 2025 11:52:28 +1000 Subject: [PATCH 6/9] raft: Update design doc to use PartitionKey The PartitionKey was previously referred to as the "Partition ID". Reword these instances in the design doc and update the wording on how the key is constructed. --- doc/raft.md | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/doc/raft.md b/doc/raft.md index 04d291eb290..5660f2dc532 100644 --- a/doc/raft.md +++ b/doc/raft.md @@ -28,17 +28,18 @@ optional storage capacity (e.g., HDD or SSD). ## Partition Identity and Membership Management -Each partition in a cluster is identified by a globally unique **Partition ID**. The Partition ID of a partition is generated -once when it is created. Under the hood, the Partition ID is a tuple of `(Authority Name, Local Partition ID)`, in which: +Each partition in a cluster is identified by a globally unique **PartitionKey**. The PartitionKey of a partition is +generated once when it is created. Under the hood, the PartitionKey is a SHA256 hash of +`(Authority Name, Local Partition ID)`, in which: - Authority Name: Represents the storage that created the partition. - Local Partition ID: A local, monotonic identifier for the partition within the authority's storage. -This identity system allows distributed partition creation without the need for a global ID registry. Although the -authority storage name is a part of the Partition ID, it does not imply ownership of the partition. A partition can move -freely to other storages after creation. +This identity system allows distributed partition creation without the need for a global ID registry. A partition can +move freely to other storages after creation due to replication, and ownership of a partition can change based on the +current leadership. -Each partition is a Raft group with one or more members. Raft groups are also identified by the Partition ID due to +Each partition is a Raft group with one or more members. Raft groups are also identified by the PartitionKey due to the one-to-one relationship. The `raftmgr.Replica` oversees all Raft activities for a Raft group member. Internally, etcd/raft assigns an @@ -56,7 +57,7 @@ scope of `etcd/raft` integration. Since Gitaly follows a multi-Raft architecture, the Member ID alone is insufficient to precisely locate a partition or Raft group replica. Therefore, each replica (including the leader) is identified using a -**Replica ID**, which consists of `(Partition ID, Member ID, Replica Storage Name)`. +**Replica ID**, which consists of `(PartitionKey, Member ID, Replica Storage Name)`. To ensure fault tolerance in a quorum-based system, a Raft cluster requires a minimum replication factor of 3. It can tolerate up to `(n-1)/2` failures. For example, a 3-replica cluster can tolerate 1 failure. @@ -65,7 +66,7 @@ The cluster maintains a global, eventually consistent **routing table** for look in the table includes: ```plaintext -Partition ID: [ +PartitionKey: [ RelativePath: "@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git" Replicas: [ @@ -83,8 +84,8 @@ groups and the high update frequency. The routing table and advertised storage a gossip network. The `RelativePath` field in the routing table aims to maintain backward compatibility with GitLab-specific clients. It is -removed after clients fully transition to the new Partition ID system. The `Term` and `Index` fields ensure the order of -updates. Entries of the same Partition ID with either a lower term or index are replaced by newer ones. +removed after clients fully transition to the new PartitionKey system. The `Term` and `Index` fields ensure the order of +updates. Entries of the same PartitionKey with either a lower term or index are replaced by newer ones. The following chart illustrates a simplified semantic organization of data (not actual data placement) on a Gitaly server. -- GitLab From ba411bcfecdd8062adc4e76ce24d6cd7d7226eef Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 1 Jul 2025 14:37:01 +1000 Subject: [PATCH 7/9] raft: Rename authorityName to storageName The ReplicaLogStore contains an authorityName field, and the NewReplicaLogStore() constructor takes an authorityName parameter. This name only makes sense if the ReplicaLogStore is being created by the storage which is minting the partition. When the store is being created on another node due to the partition being replicated, the argument would be the local storage name. Rename authorityName to storageName to accurately represent what is being passed in. --- internal/gitaly/storage/raftmgr/replica.go | 4 +- .../storage/raftmgr/replica_log_store.go | 50 +++++++++---------- .../storage/raftmgr/replica_log_store_test.go | 28 +++++------ 3 files changed, 41 insertions(+), 41 deletions(-) diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index 9328c1abbf8..bb4b556fdda 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -280,7 +280,7 @@ func NewReplica( "raft.partitionKey": partitionKey.GetValue(), }) - scopedMetrics := metrics.Scope(logStore.authorityName) + scopedMetrics := metrics.Scope(logStore.storageName) return &Replica{ memberID: memberID, @@ -864,7 +864,7 @@ func (replica *Replica) processConfChange(entry raftpb.Entry) error { } // Apply the changes to the routing table - if err := routingTable.ApplyReplicaConfChange(replica.logStore.authorityName, replica.partitionKey, replicaChanges); err != nil { + if err := routingTable.ApplyReplicaConfChange(replica.logStore.storageName, replica.partitionKey, replicaChanges); err != nil { return fmt.Errorf("applying conf changes: %w", err) } diff --git a/internal/gitaly/storage/raftmgr/replica_log_store.go b/internal/gitaly/storage/raftmgr/replica_log_store.go index 3dd58099d5b..c5a098e9620 100644 --- a/internal/gitaly/storage/raftmgr/replica_log_store.go +++ b/internal/gitaly/storage/raftmgr/replica_log_store.go @@ -128,17 +128,17 @@ var ( Not ready to be used */ type ReplicaLogStore struct { - ctx context.Context - mutex sync.Mutex - authorityName string - partitionID storage.PartitionID - database keyvalue.Transactioner - localLog *log.Manager - committedLSN storage.LSN - lastTerm uint64 - consumer storage.LogConsumer - stagingDir string - snapshotter ReplicaSnapshotter + ctx context.Context + mutex sync.Mutex + storageName string + partitionID storage.PartitionID + database keyvalue.Transactioner + localLog *log.Manager + committedLSN storage.LSN + lastTerm uint64 + consumer storage.LogConsumer + stagingDir string + snapshotter ReplicaSnapshotter // hooks is a collection of hooks, used in test environment to intercept critical events hooks replicaHooks @@ -155,7 +155,7 @@ func raftManifestPath(logEntryPath string) string { // NewReplicaLogStore creates and initializes a new Storage instance. func NewReplicaLogStore( - authorityName string, + storageName string, partitionID storage.PartitionID, raftCfg config.Raft, db keyvalue.Transactioner, @@ -176,7 +176,7 @@ func NewReplicaLogStore( // Initialize the local log manager without a consumer since notifications // should only be sent when entries are committed, not when they're appended localLog := log.NewManager( - authorityName, + storageName, partitionID, stagingDirectory, stateDirectory, @@ -186,23 +186,23 @@ func NewReplicaLogStore( logger = logger.WithFields(lg.Fields{ "partition_id": partitionID, - "storage_name": authorityName, + "storage_name": storageName, }) - snapshotter, err := NewReplicaSnapshotter(raftCfg, logger, metrics.Scope(authorityName)) + snapshotter, err := NewReplicaSnapshotter(raftCfg, logger, metrics.Scope(storageName)) if err != nil { return nil, fmt.Errorf("create raft snapshotter: %w", err) } return &ReplicaLogStore{ - database: db, - authorityName: authorityName, - partitionID: partitionID, - localLog: localLog, - consumer: consumer, - stagingDir: stagingDirectory, - snapshotter: snapshotter, - hooks: noopHooks(), + database: db, + storageName: storageName, + partitionID: partitionID, + localLog: localLog, + consumer: consumer, + stagingDir: stagingDirectory, + snapshotter: snapshotter, + hooks: noopHooks(), }, nil } @@ -296,7 +296,7 @@ func (s *ReplicaLogStore) initialize(ctx context.Context, appliedLSN storage.LSN } if s.consumer != nil { - s.consumer.NotifyNewEntries(s.authorityName, s.partitionID, s.localLog.LowWaterMark(), s.committedLSN) + s.consumer.NotifyNewEntries(s.storageName, s.partitionID, s.localLog.LowWaterMark(), s.committedLSN) } } @@ -493,7 +493,7 @@ func (s *ReplicaLogStore) saveHardState(hardState raftpb.HardState) error { } if s.consumer != nil { - s.consumer.NotifyNewEntries(s.authorityName, s.partitionID, s.localLog.LowWaterMark(), committedLSN) + s.consumer.NotifyNewEntries(s.storageName, s.partitionID, s.localLog.LowWaterMark(), committedLSN) } return nil diff --git a/internal/gitaly/storage/raftmgr/replica_log_store_test.go b/internal/gitaly/storage/raftmgr/replica_log_store_test.go index 1f1ab964fe9..cd7a649c102 100644 --- a/internal/gitaly/storage/raftmgr/replica_log_store_test.go +++ b/internal/gitaly/storage/raftmgr/replica_log_store_test.go @@ -206,7 +206,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // Notify for the first time. require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(4), highWaterMark: storage.LSN(3), @@ -249,7 +249,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // Notify from low-water mark to the committedLSN for the first time. require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(3), highWaterMark: storage.LSN(3), @@ -1304,7 +1304,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from low water mark -> 1 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), @@ -1318,13 +1318,13 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from low water mark -> 2 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(2), @@ -1338,19 +1338,19 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from low water mark -> 3 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(2), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(3), @@ -1386,7 +1386,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from 1 -> 1 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), @@ -1404,13 +1404,13 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from 2 -> 2 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(2), @@ -1424,19 +1424,19 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from 2 -> 3 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(2), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(3), -- GitLab From 7cd6778a7bd1fcd0dc9fc51ea719714e1dfc8e80 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 15 Jul 2025 17:14:06 +1000 Subject: [PATCH 8/9] doc: Update Replica comment about replica identification The entire gitalypb.ReplicaID message is used to identify an instance of a replica on a given Gitaly storage. --- internal/gitaly/storage/raftmgr/replica.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index bb4b556fdda..946f7bd75b4 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -143,8 +143,7 @@ func WithEntryRecorder(recorder *ReplicaEntryRecorder) OptionFunc { // Internally, the Replica integrates with etcd/raft to implement the Raft consensus algorithm // and implements the storage.LogManager interface to interact with Gitaly's transaction system. // -// A Replica is identified by a Replica ID, which consists of -// (Partition ID, Member ID, Replica Storage Name). +// A Replica is identified by a gitalypb.ReplicaID. type Replica struct { mutex sync.Mutex -- GitLab From 8fcab422a25e1bd61b622aaa4659ccd52a2297bc Mon Sep 17 00:00:00 2001 From: James Liu Date: Wed, 16 Jul 2025 16:28:42 +1000 Subject: [PATCH 9/9] raft: Rename PartitionKey to RaftPartitionKey Adding the Raft prefix reduces ambiguity about where and how the key should be used. Other areas of the codebase define similar sounding keys, such as the backup.PartitionInfo composite key, and the storage.PartitionID which represents a locally-allocated partition ID. We should make it clear that the RaftPartitionKey is meant to represent a key that is unique across an entire Raft cluster, and can be provided to any node of that cluster in order to retrieve its local replica of a partition. --- internal/gitaly/service/raft/send_message.go | 2 +- .../gitaly/storage/raftmgr/grpc_transport.go | 14 +- internal/gitaly/storage/raftmgr/replica.go | 14 +- .../storage/raftmgr/replica_registry.go | 12 +- .../gitaly/storage/raftmgr/routing_table.go | 14 +- proto/cluster.proto | 8 +- proto/go/gitalypb/cluster.pb.go | 164 +++++++++--------- 7 files changed, 114 insertions(+), 114 deletions(-) diff --git a/internal/gitaly/service/raft/send_message.go b/internal/gitaly/service/raft/send_message.go index a7ab1c5250f..d7248952cb9 100644 --- a/internal/gitaly/service/raft/send_message.go +++ b/internal/gitaly/service/raft/send_message.go @@ -54,7 +54,7 @@ func (s *Server) SendMessage(stream gitalypb.RaftService_SendMessageServer) erro return stream.SendAndClose(&gitalypb.RaftMessageResponse{}) } -func extractRaftMessageReq(req *gitalypb.RaftMessageRequest, s *Server) (*gitalypb.ReplicaID, *gitalypb.PartitionKey, error) { +func extractRaftMessageReq(req *gitalypb.RaftMessageRequest, s *Server) (*gitalypb.ReplicaID, *gitalypb.RaftPartitionKey, error) { replicaID := req.GetReplicaId() partitionKey := replicaID.GetPartitionKey() diff --git a/internal/gitaly/storage/raftmgr/grpc_transport.go b/internal/gitaly/storage/raftmgr/grpc_transport.go index d2c9811aff8..30cb063be66 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport.go @@ -31,10 +31,10 @@ type Transport interface { // Send dispatches a batch of Raft messages. It returns an error if the sending fails. This function receives a // context, the list of messages to send and a function that returns the path of WAL directory of a particular // log entry. The implementation must respect input context's cancellation. - Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.PartitionKey, messages []raftpb.Message) error + Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.RaftPartitionKey, messages []raftpb.Message) error // Receive receives a Raft message and processes it. - Receive(ctx context.Context, partitionKey *gitalypb.PartitionKey, raftMsg raftpb.Message) error - SendSnapshot(ctx context.Context, partitionKey *gitalypb.PartitionKey, message raftpb.Message, snapshot *ReplicaSnapshot) error + Receive(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, raftMsg raftpb.Message) error + SendSnapshot(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, message raftpb.Message, snapshot *ReplicaSnapshot) error } // GrpcTransport is a gRPC transport implementation for sending Raft messages across nodes. @@ -59,7 +59,7 @@ func NewGrpcTransport(logger log.Logger, cfg config.Cfg, routingTable RoutingTab } // Send sends Raft messages to the appropriate nodes. -func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.PartitionKey, messages []raftpb.Message) error { +func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.RaftPartitionKey, messages []raftpb.Message) error { messagesByNode, err := t.prepareRaftMessageRequests(ctx, logReader, partitionKey, messages) if err != nil { return fmt.Errorf("preparing raft messages: %w", err) @@ -94,7 +94,7 @@ func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, p return nil } -func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.PartitionKey, msgs []raftpb.Message) (map[string][]*gitalypb.RaftMessageRequest, error) { +func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.RaftPartitionKey, msgs []raftpb.Message) (map[string][]*gitalypb.RaftMessageRequest, error) { messagesByAddress := make(map[string][]*gitalypb.RaftMessageRequest) messagesByAddressMutex := sync.Mutex{} g := &errgroup.Group{} @@ -209,7 +209,7 @@ func (t *GrpcTransport) packLogData(ctx context.Context, lsn storage.LSN, messag } // Receive receives a stream of Raft messages and processes them. -func (t *GrpcTransport) Receive(ctx context.Context, partitionKey *gitalypb.PartitionKey, raftMsg raftpb.Message) error { +func (t *GrpcTransport) Receive(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, raftMsg raftpb.Message) error { // Retrieve the replica from the registry, assumption is that all the messages are from the same partition key. replica, err := t.registry.GetReplica(partitionKey) if err != nil { @@ -290,7 +290,7 @@ func unpackLogData(msg *gitalypb.RaftEntry, logEntryPath string) error { } // SendSnapshot sends a snapshot of a partition to a specified node in the cluster. -func (t *GrpcTransport) SendSnapshot(ctx context.Context, pk *gitalypb.PartitionKey, message raftpb.Message, snapshot *ReplicaSnapshot) (returnedErr error) { +func (t *GrpcTransport) SendSnapshot(ctx context.Context, pk *gitalypb.RaftPartitionKey, message raftpb.Message, snapshot *ReplicaSnapshot) (returnedErr error) { followerMemberID := message.To // Find replica's address as recipient of snapshot diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index 946f7bd75b4..cbd00a0cedb 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -151,7 +151,7 @@ type Replica struct { cancel context.CancelFunc memberID uint64 // Member ID of the replica - partitionKey *gitalypb.PartitionKey + partitionKey *gitalypb.RaftPartitionKey node raft.Node // etcd/raft node representation raftCfg config.Raft // etcd/raft configurations options ReplicaOptions // Additional replica configuration @@ -206,7 +206,7 @@ func applyOptions(raftCfg config.Raft, opts []OptionFunc) (ReplicaOptions, error type RaftReplicaFactory func( memberID uint64, storageName string, - partitionKey *gitalypb.PartitionKey, + partitionKey *gitalypb.RaftPartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, @@ -218,7 +218,7 @@ func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionF return func( memberID uint64, storageName string, - partitionKey *gitalypb.PartitionKey, + partitionKey *gitalypb.RaftPartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, @@ -253,7 +253,7 @@ func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionF // the Raft protocol operation. func NewReplica( memberID uint64, - partitionKey *gitalypb.PartitionKey, + partitionKey *gitalypb.RaftPartitionKey, raftCfg config.Raft, logStore *ReplicaLogStore, raftEnabledStorage *RaftEnabledStorage, @@ -1064,9 +1064,9 @@ func checkMemberID(replica *Replica, memberID uint64, routingTable RoutingTable) } // NewPartitionKey creates a partition key for a newly-minted partition. A partition should only -// ever have a single PartitionKey, computed by the replica which first created the partition. -func NewPartitionKey(storageName string, partitionID storage.PartitionID) *gitalypb.PartitionKey { - return &gitalypb.PartitionKey{ +// ever have a single RaftPartitionKey, computed by the replica which first created the partition. +func NewPartitionKey(storageName string, partitionID storage.PartitionID) *gitalypb.RaftPartitionKey { + return &gitalypb.RaftPartitionKey{ Value: fmt.Sprintf("%x", sha256.Sum256([]byte(storageName+partitionID.String()))), } } diff --git a/internal/gitaly/storage/raftmgr/replica_registry.go b/internal/gitaly/storage/raftmgr/replica_registry.go index ee6ed2da86b..106e0f71252 100644 --- a/internal/gitaly/storage/raftmgr/replica_registry.go +++ b/internal/gitaly/storage/raftmgr/replica_registry.go @@ -12,11 +12,11 @@ var errNoReplicaFound = structerr.NewNotFound("no replica found") // ReplicaRegistry is an interface that defines the methods to register and retrieve replicas. type ReplicaRegistry interface { // GetReplica returns the replica for a given partition key. - GetReplica(key *gitalypb.PartitionKey) (RaftReplica, error) + GetReplica(key *gitalypb.RaftPartitionKey) (RaftReplica, error) // RegisterReplica registers a replica for a given partition key. - RegisterReplica(key *gitalypb.PartitionKey, replica RaftReplica) + RegisterReplica(key *gitalypb.RaftPartitionKey, replica RaftReplica) // DeregisterReplica removes the replica with the given key from the registry. - DeregisterReplica(key *gitalypb.PartitionKey) + DeregisterReplica(key *gitalypb.RaftPartitionKey) } // raftRegistry is a concrete implementation of the ReplicaRegistry interface. @@ -30,7 +30,7 @@ func NewReplicaRegistry() *raftRegistry { } // GetReplica returns the replica for a given partitionKey. -func (r *raftRegistry) GetReplica(key *gitalypb.PartitionKey) (RaftReplica, error) { +func (r *raftRegistry) GetReplica(key *gitalypb.RaftPartitionKey) (RaftReplica, error) { if mgr, ok := r.replicas.Load(key.GetValue()); ok { return mgr.(RaftReplica), nil } @@ -38,11 +38,11 @@ func (r *raftRegistry) GetReplica(key *gitalypb.PartitionKey) (RaftReplica, erro } // RegisterReplica registers a replica for a given partitionKey. -func (r *raftRegistry) RegisterReplica(key *gitalypb.PartitionKey, replica RaftReplica) { +func (r *raftRegistry) RegisterReplica(key *gitalypb.RaftPartitionKey, replica RaftReplica) { r.replicas.LoadOrStore(key.GetValue(), replica) } // DeregisterReplica removes the replica with the given key from the registry. -func (r *raftRegistry) DeregisterReplica(key *gitalypb.PartitionKey) { +func (r *raftRegistry) DeregisterReplica(key *gitalypb.RaftPartitionKey) { r.replicas.Delete(key.GetValue()) } diff --git a/internal/gitaly/storage/raftmgr/routing_table.go b/internal/gitaly/storage/raftmgr/routing_table.go index 29fc4cd3b55..6cbb7a20e20 100644 --- a/internal/gitaly/storage/raftmgr/routing_table.go +++ b/internal/gitaly/storage/raftmgr/routing_table.go @@ -13,7 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) -func routingKey(partitionKey *gitalypb.PartitionKey) []byte { +func routingKey(partitionKey *gitalypb.RaftPartitionKey) []byte { return []byte(fmt.Sprintf("raft/%s", partitionKey.GetValue())) } @@ -35,10 +35,10 @@ type ReplicaMetadata struct { // RoutingTable handles translation between member IDs and addresses type RoutingTable interface { - Translate(partitionKey *gitalypb.PartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) - GetEntry(partitionKey *gitalypb.PartitionKey) (*RoutingTableEntry, error) + Translate(partitionKey *gitalypb.RaftPartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) + GetEntry(partitionKey *gitalypb.RaftPartitionKey) (*RoutingTableEntry, error) UpsertEntry(entry RoutingTableEntry) error - ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error + ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error } // PersistentRoutingTable implements the RoutingTable interface with KV storage @@ -108,7 +108,7 @@ func (r *kvRoutingTable) UpsertEntry(entry RoutingTableEntry) error { } // GetEntry retrieves a routing table entry -func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.PartitionKey) (*RoutingTableEntry, error) { +func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.RaftPartitionKey) (*RoutingTableEntry, error) { key := routingKey(partitionKey) var entry RoutingTableEntry @@ -129,7 +129,7 @@ func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.PartitionKey) (*Routing } // Translate returns the storage name and address for a given partition key and member ID -func (r *kvRoutingTable) Translate(partitionKey *gitalypb.PartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) { +func (r *kvRoutingTable) Translate(partitionKey *gitalypb.RaftPartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) { entry, err := r.GetEntry(partitionKey) if err != nil { return nil, fmt.Errorf("get entry: %w", err) @@ -144,7 +144,7 @@ func (r *kvRoutingTable) Translate(partitionKey *gitalypb.PartitionKey, memberID return nil, fmt.Errorf("no address found for memberID %d", memberID) } -func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error { +func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error { routingTableEntry, err := r.GetEntry(partitionKey) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return fmt.Errorf("getting routing table entry: %w", err) diff --git a/proto/cluster.proto b/proto/cluster.proto index 17fde5ae856..74dffe5c4f3 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -36,11 +36,11 @@ message RaftEntry { LogData data = 2; } -// PartitionKey is a globally-unique identifier for a replicated partition. -// The replica which minted the partition is responsible for computing the PartitionKey, +// RaftPartitionKey is a globally-unique identifier for a replicated partition. +// The replica which minted the partition is responsible for computing the RaftPartitionKey, // which is a hash of the storage name and partition ID. The key is then consumed // as-is by other replicas wishing to store the partition. -message PartitionKey { +message RaftPartitionKey { // value is the SHA256 digest of the storage name and partition ID of the // newly-minted partition. string value = 1; @@ -50,7 +50,7 @@ message PartitionKey { // It combines partition information with node-specific details. message ReplicaID { // partition_key identifies which partition this replica belongs to. - PartitionKey partition_key = 1; + RaftPartitionKey partition_key = 1; // member_id is the unique identifier assigned by etcd/raft. uint64 member_id = 2; // storage_name is the name of the storage where this replica is hosted. diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index 650a04451b4..a6825dae943 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -134,11 +134,11 @@ func (x *RaftEntry) GetData() *RaftEntry_LogData { return nil } -// PartitionKey is a globally-unique identifier for a replicated partition. -// The replica which minted the partition is responsible for computing the PartitionKey, +// RaftPartitionKey is a globally-unique identifier for a replicated partition. +// The replica which minted the partition is responsible for computing the RaftPartitionKey, // which is a hash of the storage name and partition ID. The key is then consumed // as-is by other replicas wishing to store the partition. -type PartitionKey struct { +type RaftPartitionKey struct { state protoimpl.MessageState `protogen:"open.v1"` // value is the SHA256 digest of the storage name and partition ID of the // newly-minted partition. @@ -147,20 +147,20 @@ type PartitionKey struct { sizeCache protoimpl.SizeCache } -func (x *PartitionKey) Reset() { - *x = PartitionKey{} +func (x *RaftPartitionKey) Reset() { + *x = RaftPartitionKey{} mi := &file_cluster_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *PartitionKey) String() string { +func (x *RaftPartitionKey) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PartitionKey) ProtoMessage() {} +func (*RaftPartitionKey) ProtoMessage() {} -func (x *PartitionKey) ProtoReflect() protoreflect.Message { +func (x *RaftPartitionKey) ProtoReflect() protoreflect.Message { mi := &file_cluster_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -172,12 +172,12 @@ func (x *PartitionKey) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PartitionKey.ProtoReflect.Descriptor instead. -func (*PartitionKey) Descriptor() ([]byte, []int) { +// Deprecated: Use RaftPartitionKey.ProtoReflect.Descriptor instead. +func (*RaftPartitionKey) Descriptor() ([]byte, []int) { return file_cluster_proto_rawDescGZIP(), []int{1} } -func (x *PartitionKey) GetValue() string { +func (x *RaftPartitionKey) GetValue() string { if x != nil { return x.Value } @@ -189,7 +189,7 @@ func (x *PartitionKey) GetValue() string { type ReplicaID struct { state protoimpl.MessageState `protogen:"open.v1"` // partition_key identifies which partition this replica belongs to. - PartitionKey *PartitionKey `protobuf:"bytes,1,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` + PartitionKey *RaftPartitionKey `protobuf:"bytes,1,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` // member_id is the unique identifier assigned by etcd/raft. MemberId uint64 `protobuf:"varint,2,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"` // storage_name is the name of the storage where this replica is hosted. @@ -232,7 +232,7 @@ func (*ReplicaID) Descriptor() ([]byte, []int) { return file_cluster_proto_rawDescGZIP(), []int{2} } -func (x *ReplicaID) GetPartitionKey() *PartitionKey { +func (x *ReplicaID) GetPartitionKey() *RaftPartitionKey { if x != nil { return x.PartitionKey } @@ -636,75 +636,75 @@ var file_cluster_proto_rawDesc = string([]byte{ 0x0a, 0x0a, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, - 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x24, 0x0a, 0x0c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xf6, 0x02, 0x0a, 0x09, - 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x12, 0x39, 0x0a, 0x0d, 0x70, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x14, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x69, - 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, - 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, - 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, 0x04, - 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x52, 0x65, - 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, - 0x24, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x61, - 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, - 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x5d, 0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x1c, 0x0a, 0x18, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, - 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, - 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, - 0x50, 0x45, 0x5f, 0x56, 0x4f, 0x54, 0x45, 0x52, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, - 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, - 0x45, 0x52, 0x10, 0x02, 0x22, 0x90, 0x01, 0x0a, 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, - 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, - 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, + 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x28, 0x0a, 0x10, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, + 0xfa, 0x02, 0x0a, 0x09, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x12, 0x3d, 0x0a, + 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, + 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, + 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1b, 0x0a, 0x09, + 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x08, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x6f, + 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x08, + 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, - 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, - 0x72, 0x61, 0x66, 0x74, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, - 0x01, 0x0a, 0x1a, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, - 0x08, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, - 0x61, 0x66, 0x74, 0x4d, 0x73, 0x67, 0x12, 0x16, 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, - 0x0a, 0x15, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, - 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, + 0x44, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, + 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, 0x24, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x5d, 0x0a, + 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1c, 0x0a, 0x18, + 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, + 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, + 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x56, 0x4f, 0x54, 0x45, 0x52, + 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, 0x45, 0x52, 0x10, 0x02, 0x22, 0x90, 0x01, 0x0a, + 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x70, 0x62, 0x2e, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, + 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, 0x01, 0x0a, 0x1a, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, - 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, - 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x0c, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x53, 0x69, 0x7a, 0x65, 0x32, 0xc6, 0x01, - 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, - 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, - 0x01, 0x12, 0x63, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, - 0x74, 0x12, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, - 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, - 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, - 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, - 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6d, 0x73, + 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x61, 0x66, 0x74, 0x4d, 0x73, 0x67, 0x12, 0x16, + 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, + 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, 0x0a, 0x15, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x73, + 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, + 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x20, + 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x73, 0x69, 0x7a, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, + 0x74, 0x53, 0x69, 0x7a, 0x65, 0x32, 0xc6, 0x01, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, + 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, + 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x63, 0x0a, 0x0c, 0x53, 0x65, 0x6e, + 0x64, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, + 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x42, 0x34, + 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, + 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, + 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( @@ -724,7 +724,7 @@ var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_cluster_proto_goTypes = []any{ (ReplicaID_ReplicaType)(0), // 0: gitaly.ReplicaID.ReplicaType (*RaftEntry)(nil), // 1: gitaly.RaftEntry - (*PartitionKey)(nil), // 2: gitaly.PartitionKey + (*RaftPartitionKey)(nil), // 2: gitaly.RaftPartitionKey (*ReplicaID)(nil), // 3: gitaly.ReplicaID (*RaftMessageRequest)(nil), // 4: gitaly.RaftMessageRequest (*RaftMessageResponse)(nil), // 5: gitaly.RaftMessageResponse @@ -736,7 +736,7 @@ var file_cluster_proto_goTypes = []any{ } var file_cluster_proto_depIdxs = []int32{ 8, // 0: gitaly.RaftEntry.data:type_name -> gitaly.RaftEntry.LogData - 2, // 1: gitaly.ReplicaID.partition_key:type_name -> gitaly.PartitionKey + 2, // 1: gitaly.ReplicaID.partition_key:type_name -> gitaly.RaftPartitionKey 9, // 2: gitaly.ReplicaID.metadata:type_name -> gitaly.ReplicaID.Metadata 0, // 3: gitaly.ReplicaID.type:type_name -> gitaly.ReplicaID.ReplicaType 3, // 4: gitaly.RaftMessageRequest.replica_id:type_name -> gitaly.ReplicaID -- GitLab