diff --git a/doc/raft.md b/doc/raft.md index 04d291eb2903ceee487a59ff00402180d25bcb1c..5660f2dc5328b97f5fc9d4e1d92fe03c57c0d9ad 100644 --- a/doc/raft.md +++ b/doc/raft.md @@ -28,17 +28,18 @@ optional storage capacity (e.g., HDD or SSD). ## Partition Identity and Membership Management -Each partition in a cluster is identified by a globally unique **Partition ID**. The Partition ID of a partition is generated -once when it is created. Under the hood, the Partition ID is a tuple of `(Authority Name, Local Partition ID)`, in which: +Each partition in a cluster is identified by a globally unique **PartitionKey**. The PartitionKey of a partition is +generated once when it is created. Under the hood, the PartitionKey is a SHA256 hash of +`(Authority Name, Local Partition ID)`, in which: - Authority Name: Represents the storage that created the partition. - Local Partition ID: A local, monotonic identifier for the partition within the authority's storage. -This identity system allows distributed partition creation without the need for a global ID registry. Although the -authority storage name is a part of the Partition ID, it does not imply ownership of the partition. A partition can move -freely to other storages after creation. +This identity system allows distributed partition creation without the need for a global ID registry. A partition can +move freely to other storages after creation due to replication, and ownership of a partition can change based on the +current leadership. -Each partition is a Raft group with one or more members. Raft groups are also identified by the Partition ID due to +Each partition is a Raft group with one or more members. Raft groups are also identified by the PartitionKey due to the one-to-one relationship. The `raftmgr.Replica` oversees all Raft activities for a Raft group member. Internally, etcd/raft assigns an @@ -56,7 +57,7 @@ scope of `etcd/raft` integration. Since Gitaly follows a multi-Raft architecture, the Member ID alone is insufficient to precisely locate a partition or Raft group replica. Therefore, each replica (including the leader) is identified using a -**Replica ID**, which consists of `(Partition ID, Member ID, Replica Storage Name)`. +**Replica ID**, which consists of `(PartitionKey, Member ID, Replica Storage Name)`. To ensure fault tolerance in a quorum-based system, a Raft cluster requires a minimum replication factor of 3. It can tolerate up to `(n-1)/2` failures. For example, a 3-replica cluster can tolerate 1 failure. @@ -65,7 +66,7 @@ The cluster maintains a global, eventually consistent **routing table** for look in the table includes: ```plaintext -Partition ID: [ +PartitionKey: [ RelativePath: "@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git" Replicas: [ @@ -83,8 +84,8 @@ groups and the high update frequency. The routing table and advertised storage a gossip network. The `RelativePath` field in the routing table aims to maintain backward compatibility with GitLab-specific clients. It is -removed after clients fully transition to the new Partition ID system. The `Term` and `Index` fields ensure the order of -updates. Entries of the same Partition ID with either a lower term or index are replaced by newer ones. +removed after clients fully transition to the new PartitionKey system. The `Term` and `Index` fields ensure the order of +updates. Entries of the same PartitionKey with either a lower term or index are replaced by newer ones. The following chart illustrates a simplified semantic organization of data (not actual data placement) on a Gitaly server. diff --git a/internal/gitaly/service/raft/send_message.go b/internal/gitaly/service/raft/send_message.go index 0ef4c500681bee0aa3d4c4c714c46c95a30a9909..d7248952cb93a9b775cdf98b442903d3b8f9f6b6 100644 --- a/internal/gitaly/service/raft/send_message.go +++ b/internal/gitaly/service/raft/send_message.go @@ -54,11 +54,9 @@ func (s *Server) SendMessage(stream gitalypb.RaftService_SendMessageServer) erro return stream.SendAndClose(&gitalypb.RaftMessageResponse{}) } -func extractRaftMessageReq(req *gitalypb.RaftMessageRequest, s *Server) (*gitalypb.ReplicaID, *gitalypb.PartitionKey, error) { +func extractRaftMessageReq(req *gitalypb.RaftMessageRequest, s *Server) (*gitalypb.ReplicaID, *gitalypb.RaftPartitionKey, error) { replicaID := req.GetReplicaId() partitionKey := replicaID.GetPartitionKey() - authorityName := partitionKey.GetAuthorityName() - partitionID := partitionKey.GetPartitionId() // The cluster ID protects Gitaly from cross-cluster interactions, which could potentially corrupt the clusters. // This is particularly crucial after disaster recovery so that an identical cluster is restored from backup. @@ -72,11 +70,8 @@ func extractRaftMessageReq(req *gitalypb.RaftMessageRequest, s *Server) (*gitaly req.GetClusterId(), s.cfg.Raft.ClusterID) } - if authorityName == "" { - return nil, nil, structerr.NewInvalidArgument("authority_name is required") - } - if partitionID == 0 { - return nil, nil, structerr.NewInvalidArgument("partition_id is required") + if partitionKey.GetValue() == "" { + return nil, nil, structerr.NewInvalidArgument("partition_key cannot be empty") } return replicaID, partitionKey, nil } diff --git a/internal/gitaly/service/raft/send_message_test.go b/internal/gitaly/service/raft/send_message_test.go index 50fe2f2a15b09cf47ef987c04ac23c1a5eb86d1a..016a3a5c8bb1e4fd4ad4e9da9eb79a413d3266ba 100644 --- a/internal/gitaly/service/raft/send_message_test.go +++ b/internal/gitaly/service/raft/send_message_test.go @@ -56,10 +56,7 @@ func TestServer_SendMessage(t *testing.T) { registry := storage.(*raftmgr.RaftEnabledStorage).GetReplicaRegistry() replica := &mockRaftReplica{} - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - } + partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) // Register storage two @@ -83,11 +80,8 @@ func TestServer_SendMessage(t *testing.T) { req: &gitalypb.RaftMessageRequest{ ClusterId: "test-cluster", ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameOne, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -100,11 +94,8 @@ func TestServer_SendMessage(t *testing.T) { req: &gitalypb.RaftMessageRequest{ ClusterId: "test-cluster", ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameTwo, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameTwo, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -116,11 +107,8 @@ func TestServer_SendMessage(t *testing.T) { desc: "missing cluster ID", req: &gitalypb.RaftMessageRequest{ ReplicaId: &gitalypb.ReplicaID{ - StorageName: "storage-name", - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: "storage-name", + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -135,11 +123,8 @@ func TestServer_SendMessage(t *testing.T) { req: &gitalypb.RaftMessageRequest{ ClusterId: "wrong-cluster", ReplicaId: &gitalypb.ReplicaID{ - StorageName: "storage-name", - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: "storage-name", + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -149,42 +134,6 @@ func TestServer_SendMessage(t *testing.T) { expectedGrpcErr: codes.PermissionDenied, expectedError: `rpc error: code = PermissionDenied desc = message from wrong cluster: got "wrong-cluster", want "test-cluster"`, }, - { - desc: "missing authority name", - req: &gitalypb.RaftMessageRequest{ - ClusterId: "test-cluster", - ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - PartitionId: 1, - }, - }, - Message: &raftpb.Message{ - Type: raftpb.MsgApp, - To: 2, - }, - }, - expectedGrpcErr: codes.InvalidArgument, - expectedError: "rpc error: code = InvalidArgument desc = authority_name is required", - }, - { - desc: "missing partition ID", - req: &gitalypb.RaftMessageRequest{ - ClusterId: "test-cluster", - ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - }, - }, - Message: &raftpb.Message{ - Type: raftpb.MsgApp, - To: 2, - }, - }, - expectedGrpcErr: codes.InvalidArgument, - expectedError: "rpc error: code = InvalidArgument desc = partition_id is required", - }, } for _, tc := range testCases { diff --git a/internal/gitaly/service/raft/send_snapshot.go b/internal/gitaly/service/raft/send_snapshot.go index e0925e90ed23f3a07e144e3958f070d221e5e38c..5d29415fefc07eafb65d696f4d1b78443d57edb2 100644 --- a/internal/gitaly/service/raft/send_snapshot.go +++ b/internal/gitaly/service/raft/send_snapshot.go @@ -27,7 +27,7 @@ func (s *Server) SendSnapshot(stream gitalypb.RaftService_SendSnapshotServer) (r return err } - fname := fmt.Sprintf("%016d-%016d-%016d%s", partitionKey.GetPartitionId(), raftMsg.GetMessage().Term, raftMsg.GetMessage().Index, ".snap") + fname := fmt.Sprintf("%s-%016d-%016d%s", partitionKey.GetValue(), raftMsg.GetMessage().Term, raftMsg.GetMessage().Index, ".snap") snapshotPath := filepath.Join(s.cfg.Raft.SnapshotDir, fname) snapshotFile, err := os.Create(snapshotPath) if err != nil { diff --git a/internal/gitaly/service/raft/send_snapshot_test.go b/internal/gitaly/service/raft/send_snapshot_test.go index d63a8454792e054c3884abe90a8e3d8b22f5a420..d52ff9dc78b14e5dd19a146ab8a21413aebe6089 100644 --- a/internal/gitaly/service/raft/send_snapshot_test.go +++ b/internal/gitaly/service/raft/send_snapshot_test.go @@ -1,6 +1,7 @@ package raft import ( + "fmt" "os" "path/filepath" "testing" @@ -57,10 +58,7 @@ func TestServer_SendSnapshot_Success(t *testing.T) { registry := storage.(*raftmgr.RaftEnabledStorage).GetReplicaRegistry() replica := &mockRaftReplica{} - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - } + partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) client := runRaftServer(t, ctx, cfg, mockNode) @@ -75,11 +73,8 @@ func TestServer_SendSnapshot_Success(t *testing.T) { RaftMsg: &gitalypb.RaftMessageRequest{ ClusterId: clusterID, ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameOne, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -110,7 +105,7 @@ func TestServer_SendSnapshot_Success(t *testing.T) { testhelper.RequireDirectoryState(t, cfg.Raft.SnapshotDir, "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, - "/0000000000000001-0000000000000002-0000000000000003.snap": {Mode: os.FileMode(0o644), Content: data}, + fmt.Sprintf("/%s-0000000000000002-0000000000000003.snap", partitionKey.GetValue()): {Mode: os.FileMode(0o644), Content: data}, }) } @@ -130,11 +125,8 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { RaftSnapshotPayload: &gitalypb.RaftSnapshotMessageRequest_RaftMsg{ RaftMsg: &gitalypb.RaftMessageRequest{ ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameOne, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -153,11 +145,8 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { RaftMsg: &gitalypb.RaftMessageRequest{ ClusterId: "wrong-cluster", ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - }, + StorageName: storageNameOne, + PartitionKey: raftmgr.NewPartitionKey(authorityName, 1), }, Message: &raftpb.Message{ Type: raftpb.MsgApp, @@ -169,50 +158,6 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { expectedGrpcErr: codes.PermissionDenied, expectedError: `rpc error: code = PermissionDenied desc = message from wrong cluster: got "wrong-cluster", want "test-cluster"`, }, - { - desc: "missing authority name", - req: &gitalypb.RaftSnapshotMessageRequest{ - RaftSnapshotPayload: &gitalypb.RaftSnapshotMessageRequest_RaftMsg{ - RaftMsg: &gitalypb.RaftMessageRequest{ - ClusterId: clusterID, - ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - PartitionId: 1, - }, - }, - Message: &raftpb.Message{ - Type: raftpb.MsgApp, - To: 2, - }, - }, - }, - }, - expectedGrpcErr: codes.InvalidArgument, - expectedError: "rpc error: code = InvalidArgument desc = authority_name is required", - }, - { - desc: "missing partition ID", - req: &gitalypb.RaftSnapshotMessageRequest{ - RaftSnapshotPayload: &gitalypb.RaftSnapshotMessageRequest_RaftMsg{ - RaftMsg: &gitalypb.RaftMessageRequest{ - ClusterId: clusterID, - ReplicaId: &gitalypb.ReplicaID{ - StorageName: storageNameOne, - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: authorityName, - }, - }, - Message: &raftpb.Message{ - Type: raftpb.MsgApp, - To: 2, - }, - }, - }, - }, - expectedGrpcErr: codes.InvalidArgument, - expectedError: "rpc error: code = InvalidArgument desc = partition_id is required", - }, } for _, tc := range errorTestCases { @@ -254,10 +199,7 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { registry := storage.(*raftmgr.RaftEnabledStorage).GetReplicaRegistry() replica := &mockRaftReplica{} - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: authorityName, - PartitionId: 1, - } + partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) client := runRaftServer(t, ctx, cfg, mockNode) diff --git a/internal/gitaly/storage/raftmgr/grpc_transport.go b/internal/gitaly/storage/raftmgr/grpc_transport.go index 4a2d5c0c88164b440f31bcb354e4ae573556f45b..30cb063be665716f3e128fff1cbfa7966c8891e6 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport.go @@ -31,10 +31,10 @@ type Transport interface { // Send dispatches a batch of Raft messages. It returns an error if the sending fails. This function receives a // context, the list of messages to send and a function that returns the path of WAL directory of a particular // log entry. The implementation must respect input context's cancellation. - Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.PartitionKey, messages []raftpb.Message) error + Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.RaftPartitionKey, messages []raftpb.Message) error // Receive receives a Raft message and processes it. - Receive(ctx context.Context, partitionKey *gitalypb.PartitionKey, raftMsg raftpb.Message) error - SendSnapshot(ctx context.Context, partitionKey *gitalypb.PartitionKey, message raftpb.Message, snapshot *ReplicaSnapshot) error + Receive(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, raftMsg raftpb.Message) error + SendSnapshot(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, message raftpb.Message, snapshot *ReplicaSnapshot) error } // GrpcTransport is a gRPC transport implementation for sending Raft messages across nodes. @@ -59,7 +59,7 @@ func NewGrpcTransport(logger log.Logger, cfg config.Cfg, routingTable RoutingTab } // Send sends Raft messages to the appropriate nodes. -func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.PartitionKey, messages []raftpb.Message) error { +func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.RaftPartitionKey, messages []raftpb.Message) error { messagesByNode, err := t.prepareRaftMessageRequests(ctx, logReader, partitionKey, messages) if err != nil { return fmt.Errorf("preparing raft messages: %w", err) @@ -94,7 +94,7 @@ func (t *GrpcTransport) Send(ctx context.Context, logReader storage.LogReader, p return nil } -func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.PartitionKey, msgs []raftpb.Message) (map[string][]*gitalypb.RaftMessageRequest, error) { +func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReader storage.LogReader, partitionKey *gitalypb.RaftPartitionKey, msgs []raftpb.Message) (map[string][]*gitalypb.RaftMessageRequest, error) { messagesByAddress := make(map[string][]*gitalypb.RaftMessageRequest) messagesByAddressMutex := sync.Mutex{} g := &errgroup.Group{} @@ -209,7 +209,7 @@ func (t *GrpcTransport) packLogData(ctx context.Context, lsn storage.LSN, messag } // Receive receives a stream of Raft messages and processes them. -func (t *GrpcTransport) Receive(ctx context.Context, partitionKey *gitalypb.PartitionKey, raftMsg raftpb.Message) error { +func (t *GrpcTransport) Receive(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, raftMsg raftpb.Message) error { // Retrieve the replica from the registry, assumption is that all the messages are from the same partition key. replica, err := t.registry.GetReplica(partitionKey) if err != nil { @@ -290,7 +290,7 @@ func unpackLogData(msg *gitalypb.RaftEntry, logEntryPath string) error { } // SendSnapshot sends a snapshot of a partition to a specified node in the cluster. -func (t *GrpcTransport) SendSnapshot(ctx context.Context, pk *gitalypb.PartitionKey, message raftpb.Message, snapshot *ReplicaSnapshot) (returnedErr error) { +func (t *GrpcTransport) SendSnapshot(ctx context.Context, pk *gitalypb.RaftPartitionKey, message raftpb.Message, snapshot *ReplicaSnapshot) (returnedErr error) { followerMemberID := message.To // Find replica's address as recipient of snapshot @@ -325,11 +325,8 @@ func (t *GrpcTransport) SendSnapshot(ctx context.Context, pk *gitalypb.Partition RaftMsg: &gitalypb.RaftMessageRequest{ ClusterId: t.cfg.Raft.ClusterID, ReplicaId: &gitalypb.ReplicaID{ - StorageName: replica.GetStorageName(), - PartitionKey: &gitalypb.PartitionKey{ - AuthorityName: pk.GetAuthorityName(), - PartitionId: pk.GetPartitionId(), - }, + StorageName: replica.GetStorageName(), + PartitionKey: pk, }, Message: &message, }, diff --git a/internal/gitaly/storage/raftmgr/grpc_transport_test.go b/internal/gitaly/storage/raftmgr/grpc_transport_test.go index 67e796ab5b89cd40c63728d38b3aa8b5bc816b6b..c39ad4921b53b7e4a11b70a124faef7c32480235 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport_test.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport_test.go @@ -172,10 +172,7 @@ func TestGrpcTransport_SendAndReceive(t *testing.T) { require.NoError(t, leader.transport.(*GrpcTransport).connectionPool.Close()) }) - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(tc.partitionID), - AuthorityName: storageName, - } + partitionKey := NewPartitionKey(storageName, storage.PartitionID(tc.partitionID)) mgr, err := leader.managerRegistry.GetReplica(partitionKey) require.NoError(t, err) @@ -294,10 +291,7 @@ func setupCluster(t *testing.T, logger logger.LogrusLogger, numNodes int, partit // Create and set up replica replica := newReplica(config) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: storageName, - PartitionId: uint64(partitionID), - } + partitionKey := NewPartitionKey(storageName, storage.PartitionID(partitionID)) // Register the manager with the registry registries[i].RegisterReplica(partitionKey, replica) @@ -575,10 +569,7 @@ func TestGrpcTransport_SendSnapshot(t *testing.T) { }() } - err = leader.transport.SendSnapshot(ctx, &gitalypb.PartitionKey{ - AuthorityName: storageName, - PartitionId: 1, - }, msg, snapshot) + err = leader.transport.SendSnapshot(ctx, NewPartitionKey(storageName, 1), msg, snapshot) if tc.expectedError != "" { require.ErrorContains(t, err, tc.expectedError) } else { diff --git a/internal/gitaly/storage/raftmgr/raft_enabled_storage.go b/internal/gitaly/storage/raftmgr/raft_enabled_storage.go index bad08c6074ae1b8cfe0ee0cf7672ddf5799d3b0f..882935924df9e45a02e5a094b25b3b6d8e0856da 100644 --- a/internal/gitaly/storage/raftmgr/raft_enabled_storage.go +++ b/internal/gitaly/storage/raftmgr/raft_enabled_storage.go @@ -8,7 +8,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/log" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) // RaftEnabledStorage wraps a storage.Storage instance with Raft functionality @@ -36,12 +35,8 @@ func (s *RaftEnabledStorage) GetReplicaRegistry() ReplicaRegistry { // RegisterReplica registers a replica with this RaftEnabledStorage // This should be called after both the replica and RaftEnabledStorage are created -func (s *RaftEnabledStorage) RegisterReplica(partitionID storage.PartitionID, replica *Replica) error { - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(partitionID), - AuthorityName: replica.authorityName, - } - s.replicaRegistry.RegisterReplica(partitionKey, replica) +func (s *RaftEnabledStorage) RegisterReplica(replica *Replica) error { + s.replicaRegistry.RegisterReplica(replica.partitionKey, replica) return nil } @@ -49,11 +44,7 @@ func (s *RaftEnabledStorage) RegisterReplica(partitionID storage.PartitionID, re // DeregisterReplica removes a replica from this RaftEnabledStorage. // This should be called when the replica is closing. func (s *RaftEnabledStorage) DeregisterReplica(replica *Replica) { - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(replica.ptnID), - AuthorityName: replica.authorityName, - } - s.replicaRegistry.DeregisterReplica(partitionKey) + s.replicaRegistry.DeregisterReplica(replica.partitionKey) } // Node adds Raft functionality to each storage diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index b577ca6440ddcecc12573d6aaa453778bff4c23d..cbd00a0cedb64a02cf20dd7d065f5901572aae5d 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -2,6 +2,7 @@ package raftmgr import ( "context" + "crypto/sha256" "errors" "fmt" "runtime" @@ -142,29 +143,27 @@ func WithEntryRecorder(recorder *ReplicaEntryRecorder) OptionFunc { // Internally, the Replica integrates with etcd/raft to implement the Raft consensus algorithm // and implements the storage.LogManager interface to interact with Gitaly's transaction system. // -// A Replica is identified by a Replica ID, which consists of -// (Partition ID, Member ID, Replica Storage Name). +// A Replica is identified by a gitalypb.ReplicaID. type Replica struct { mutex sync.Mutex ctx context.Context // Context for controlling replica's lifecycle cancel context.CancelFunc - memberID uint64 // Member ID of the replica - authorityName string // Name of the storage this partition belongs to - ptnID storage.PartitionID // Unique identifier for the managed partition - node raft.Node // etcd/raft node representation - raftCfg config.Raft // etcd/raft configurations - options ReplicaOptions // Additional replica configuration - logger logging.Logger // Internal logging - logStore *ReplicaLogStore // Persistent storage for Raft logs and state - registry *ReplicaEventRegistry // Event tracking - leadership *ReplicaLeadership // Current leadership information - syncer safe.Syncer // Synchronization operations - wg sync.WaitGroup // Goroutine lifecycle management - ready *ready // Initialization state tracking - started bool // Indicates if replica has been started - metrics RaftMetrics // Scoped metrics for this replica + memberID uint64 // Member ID of the replica + partitionKey *gitalypb.RaftPartitionKey + node raft.Node // etcd/raft node representation + raftCfg config.Raft // etcd/raft configurations + options ReplicaOptions // Additional replica configuration + logger logging.Logger // Internal logging + logStore *ReplicaLogStore // Persistent storage for Raft logs and state + registry *ReplicaEventRegistry // Event tracking + leadership *ReplicaLeadership // Current leadership information + syncer safe.Syncer // Synchronization operations + wg sync.WaitGroup // Goroutine lifecycle management + ready *ready // Initialization state tracking + started bool // Indicates if replica has been started + metrics RaftMetrics // Scoped metrics for this replica // Reference to the RaftEnabledStorage that contains this replica raftEnabledStorage *RaftEnabledStorage @@ -207,7 +206,7 @@ func applyOptions(raftCfg config.Raft, opts []OptionFunc) (ReplicaOptions, error type RaftReplicaFactory func( memberID uint64, storageName string, - partitionID storage.PartitionID, + partitionKey *gitalypb.RaftPartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, @@ -219,7 +218,7 @@ func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionF return func( memberID uint64, storageName string, - partitionID storage.PartitionID, + partitionKey *gitalypb.RaftPartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, @@ -234,14 +233,14 @@ func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionF return nil, fmt.Errorf("storage %q is not a RaftEnabledStorage", storageName) } - replica, err := NewReplica(memberID, storageName, partitionID, raftCfg, logStore, raftEnabledStorage, logger, metrics, opts...) + replica, err := NewReplica(memberID, partitionKey, raftCfg, logStore, raftEnabledStorage, logger, metrics, opts...) if err != nil { return nil, fmt.Errorf("create replica %q: %w", storageName, err) } - if err := raftEnabledStorage.RegisterReplica(partitionID, replica); err != nil { - return nil, fmt.Errorf("register replica for partition %d in storage %q: %w", - partitionID, storageName, err) + if err := raftEnabledStorage.RegisterReplica(replica); err != nil { + return nil, fmt.Errorf("register replica %q in storage %q: %w", + partitionKey.GetValue(), storageName, err) } return replica, nil @@ -254,8 +253,7 @@ func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionF // the Raft protocol operation. func NewReplica( memberID uint64, - authorityName string, - partitionID storage.PartitionID, + partitionKey *gitalypb.RaftPartitionKey, raftCfg config.Raft, logStore *ReplicaLogStore, raftEnabledStorage *RaftEnabledStorage, @@ -277,17 +275,15 @@ func NewReplica( } logger = logger.WithFields(logging.Fields{ - "component": "raft", - "raft.authority": authorityName, - "raft.partition": partitionID, + "component": "raft", + "raft.partitionKey": partitionKey.GetValue(), }) - scopedMetrics := metrics.Scope(authorityName) + scopedMetrics := metrics.Scope(logStore.storageName) return &Replica{ memberID: memberID, - authorityName: authorityName, - ptnID: partitionID, + partitionKey: partitionKey, raftCfg: raftCfg, options: options, logStore: logStore, @@ -323,7 +319,7 @@ func (replica *Replica) Initialize(ctx context.Context, appliedLSN storage.LSN) defer replica.mutex.Unlock() if replica.started { - return fmt.Errorf("raft replica for partition %q already started", replica.ptnID) + return fmt.Errorf("raft replica %q already started", replica.partitionKey.GetValue()) } replica.started = true @@ -861,18 +857,13 @@ func (replica *Replica) processConfChange(entry raftpb.Entry) error { return fmt.Errorf("saving config state: %w", err) } - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(replica.ptnID), - } - routingTable := replica.raftEnabledStorage.GetRoutingTable() if routingTable == nil { return fmt.Errorf("routing table not found") } // Apply the changes to the routing table - if err := routingTable.ApplyReplicaConfChange(partitionKey, replicaChanges); err != nil { + if err := routingTable.ApplyReplicaConfChange(replica.logStore.storageName, replica.partitionKey, replicaChanges); err != nil { return fmt.Errorf("applying conf changes: %w", err) } @@ -894,15 +885,11 @@ func (replica *Replica) sendMessages(rd *raft.Ready) error { // techniques such as batching health checks and quiescing inactive groups. // // See https://gitlab.com/gitlab-org/gitaly/-/issues/6304 - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(replica.ptnID), - } transport := replica.raftEnabledStorage.GetTransport() if transport == nil { return fmt.Errorf("transport not found") } - err := transport.Send(replica.ctx, replica, partitionKey, rd.Messages) + err := transport.Send(replica.ctx, replica, replica.partitionKey, rd.Messages) if err != nil { return err } @@ -1069,16 +1056,19 @@ func (replica *Replica) proposeMembershipChange( } func checkMemberID(replica *Replica, memberID uint64, routingTable RoutingTable) error { - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(replica.ptnID), - } - - _, err := routingTable.Translate(partitionKey, memberID) + _, err := routingTable.Translate(replica.partitionKey, memberID) if err != nil { return fmt.Errorf("translating member ID: %w", err) } return nil } +// NewPartitionKey creates a partition key for a newly-minted partition. A partition should only +// ever have a single RaftPartitionKey, computed by the replica which first created the partition. +func NewPartitionKey(storageName string, partitionID storage.PartitionID) *gitalypb.RaftPartitionKey { + return &gitalypb.RaftPartitionKey{ + Value: fmt.Sprintf("%x", sha256.Sum256([]byte(storageName+partitionID.String()))), + } +} + var _ = (storage.LogManager)(&Replica{}) // Ensure Replica implements LogManager interface diff --git a/internal/gitaly/storage/raftmgr/replica_log_store.go b/internal/gitaly/storage/raftmgr/replica_log_store.go index 3dd58099d5b8082a34da3b44cdf285294bbf6f18..c5a098e9620e83a83f2d3de91ee31f53af87b8c1 100644 --- a/internal/gitaly/storage/raftmgr/replica_log_store.go +++ b/internal/gitaly/storage/raftmgr/replica_log_store.go @@ -128,17 +128,17 @@ var ( Not ready to be used */ type ReplicaLogStore struct { - ctx context.Context - mutex sync.Mutex - authorityName string - partitionID storage.PartitionID - database keyvalue.Transactioner - localLog *log.Manager - committedLSN storage.LSN - lastTerm uint64 - consumer storage.LogConsumer - stagingDir string - snapshotter ReplicaSnapshotter + ctx context.Context + mutex sync.Mutex + storageName string + partitionID storage.PartitionID + database keyvalue.Transactioner + localLog *log.Manager + committedLSN storage.LSN + lastTerm uint64 + consumer storage.LogConsumer + stagingDir string + snapshotter ReplicaSnapshotter // hooks is a collection of hooks, used in test environment to intercept critical events hooks replicaHooks @@ -155,7 +155,7 @@ func raftManifestPath(logEntryPath string) string { // NewReplicaLogStore creates and initializes a new Storage instance. func NewReplicaLogStore( - authorityName string, + storageName string, partitionID storage.PartitionID, raftCfg config.Raft, db keyvalue.Transactioner, @@ -176,7 +176,7 @@ func NewReplicaLogStore( // Initialize the local log manager without a consumer since notifications // should only be sent when entries are committed, not when they're appended localLog := log.NewManager( - authorityName, + storageName, partitionID, stagingDirectory, stateDirectory, @@ -186,23 +186,23 @@ func NewReplicaLogStore( logger = logger.WithFields(lg.Fields{ "partition_id": partitionID, - "storage_name": authorityName, + "storage_name": storageName, }) - snapshotter, err := NewReplicaSnapshotter(raftCfg, logger, metrics.Scope(authorityName)) + snapshotter, err := NewReplicaSnapshotter(raftCfg, logger, metrics.Scope(storageName)) if err != nil { return nil, fmt.Errorf("create raft snapshotter: %w", err) } return &ReplicaLogStore{ - database: db, - authorityName: authorityName, - partitionID: partitionID, - localLog: localLog, - consumer: consumer, - stagingDir: stagingDirectory, - snapshotter: snapshotter, - hooks: noopHooks(), + database: db, + storageName: storageName, + partitionID: partitionID, + localLog: localLog, + consumer: consumer, + stagingDir: stagingDirectory, + snapshotter: snapshotter, + hooks: noopHooks(), }, nil } @@ -296,7 +296,7 @@ func (s *ReplicaLogStore) initialize(ctx context.Context, appliedLSN storage.LSN } if s.consumer != nil { - s.consumer.NotifyNewEntries(s.authorityName, s.partitionID, s.localLog.LowWaterMark(), s.committedLSN) + s.consumer.NotifyNewEntries(s.storageName, s.partitionID, s.localLog.LowWaterMark(), s.committedLSN) } } @@ -493,7 +493,7 @@ func (s *ReplicaLogStore) saveHardState(hardState raftpb.HardState) error { } if s.consumer != nil { - s.consumer.NotifyNewEntries(s.authorityName, s.partitionID, s.localLog.LowWaterMark(), committedLSN) + s.consumer.NotifyNewEntries(s.storageName, s.partitionID, s.localLog.LowWaterMark(), committedLSN) } return nil diff --git a/internal/gitaly/storage/raftmgr/replica_log_store_test.go b/internal/gitaly/storage/raftmgr/replica_log_store_test.go index 1f1ab964fe9082cbbd5cd482b4463106bd4df3e3..cd7a649c102840bdc35ebdd082af8ba45a8a1435 100644 --- a/internal/gitaly/storage/raftmgr/replica_log_store_test.go +++ b/internal/gitaly/storage/raftmgr/replica_log_store_test.go @@ -206,7 +206,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // Notify for the first time. require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(4), highWaterMark: storage.LSN(3), @@ -249,7 +249,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // Notify from low-water mark to the committedLSN for the first time. require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(3), highWaterMark: storage.LSN(3), @@ -1304,7 +1304,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from low water mark -> 1 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), @@ -1318,13 +1318,13 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from low water mark -> 2 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(2), @@ -1338,19 +1338,19 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from low water mark -> 3 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(2), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(3), @@ -1386,7 +1386,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from 1 -> 1 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), @@ -1404,13 +1404,13 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from 2 -> 2 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(2), @@ -1424,19 +1424,19 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { // Receive notification from 2 -> 3 require.Equal(t, []mockNotification{ { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(2), }, { - storageName: logStore.authorityName, + storageName: logStore.storageName, partitionID: logStore.partitionID, lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(3), diff --git a/internal/gitaly/storage/raftmgr/replica_registry.go b/internal/gitaly/storage/raftmgr/replica_registry.go index 4a519944dcf15a2375801760ffeb1186dc19012a..106e0f71252452522543bfd21a0f8753811f44d9 100644 --- a/internal/gitaly/storage/raftmgr/replica_registry.go +++ b/internal/gitaly/storage/raftmgr/replica_registry.go @@ -1,7 +1,6 @@ package raftmgr import ( - "fmt" "sync" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -10,18 +9,14 @@ import ( var errNoReplicaFound = structerr.NewNotFound("no replica found") -func partitionKeyToString(pk *gitalypb.PartitionKey) string { - return fmt.Sprintf("%d:%s", pk.GetPartitionId(), pk.GetAuthorityName()) -} - // ReplicaRegistry is an interface that defines the methods to register and retrieve replicas. type ReplicaRegistry interface { // GetReplica returns the replica for a given partition key. - GetReplica(key *gitalypb.PartitionKey) (RaftReplica, error) + GetReplica(key *gitalypb.RaftPartitionKey) (RaftReplica, error) // RegisterReplica registers a replica for a given partition key. - RegisterReplica(key *gitalypb.PartitionKey, replica RaftReplica) + RegisterReplica(key *gitalypb.RaftPartitionKey, replica RaftReplica) // DeregisterReplica removes the replica with the given key from the registry. - DeregisterReplica(key *gitalypb.PartitionKey) + DeregisterReplica(key *gitalypb.RaftPartitionKey) } // raftRegistry is a concrete implementation of the ReplicaRegistry interface. @@ -35,19 +30,19 @@ func NewReplicaRegistry() *raftRegistry { } // GetReplica returns the replica for a given partitionKey. -func (r *raftRegistry) GetReplica(key *gitalypb.PartitionKey) (RaftReplica, error) { - if mgr, ok := r.replicas.Load(partitionKeyToString(key)); ok { +func (r *raftRegistry) GetReplica(key *gitalypb.RaftPartitionKey) (RaftReplica, error) { + if mgr, ok := r.replicas.Load(key.GetValue()); ok { return mgr.(RaftReplica), nil } return nil, errNoReplicaFound.WithMetadata("partition_key", key) } // RegisterReplica registers a replica for a given partitionKey. -func (r *raftRegistry) RegisterReplica(key *gitalypb.PartitionKey, replica RaftReplica) { - r.replicas.LoadOrStore(partitionKeyToString(key), replica) +func (r *raftRegistry) RegisterReplica(key *gitalypb.RaftPartitionKey, replica RaftReplica) { + r.replicas.LoadOrStore(key.GetValue(), replica) } // DeregisterReplica removes the replica with the given key from the registry. -func (r *raftRegistry) DeregisterReplica(key *gitalypb.PartitionKey) { - r.replicas.Delete(partitionKeyToString(key)) +func (r *raftRegistry) DeregisterReplica(key *gitalypb.RaftPartitionKey) { + r.replicas.Delete(key.GetValue()) } diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index dd223561388ab555590fbe6e37c790ebf9ce4dc1..3b8b7601edb4aee1f8ddc45e15e9786497d37a22 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -156,7 +156,7 @@ func TestReplica_Initialize(t *testing.T) { // Second initialization should fail err = mgr.Initialize(ctx, 0) - require.EqualError(t, err, fmt.Sprintf("raft replica for partition %q already started", partitionID)) + require.EqualError(t, err, fmt.Sprintf("raft replica %q already started", mgr.partitionKey.GetValue())) require.NoError(t, mgr.Close()) }) @@ -239,7 +239,7 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr, err := raftFactory(1, storageName, partitionID, logStore, logger, metrics) + mgr, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, mgr.Close()) }() @@ -317,7 +317,7 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr, err := raftFactory(1, storageName, partitionID, logStore, logger, metrics) + mgr, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, mgr.Close()) }() @@ -362,7 +362,7 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr1, err := raftFactory(1, storageName, partitionID, logStore1, logger, metrics) + mgr1, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore1, logger, metrics) require.NoError(t, err) // Initialize the raft replica @@ -424,7 +424,7 @@ func TestReplica_Initialize(t *testing.T) { logStore2, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) - mgr2, err := raftFactory(1, storageName, partitionID, logStore2, logger, metrics) + mgr2, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore2, logger, metrics) require.NoError(t, err) // Re-initialize Raft with the highest LSN @@ -711,7 +711,7 @@ func TestReplica_AppendLogEntry(t *testing.T) { mgr, err := raftFactory( 1, storageName, - partitionID, + NewPartitionKey(storageName, partitionID), logStore, logger, metrics, @@ -829,7 +829,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) // Configure replica - mgr, err := raftFactory(1, storageName, partitionID, logStore, logger, metrics) + mgr, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) require.NoError(t, err) for _, f := range setupFuncs { @@ -882,7 +882,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(env.recorder)) - recoveryMgr, err := raftFactory(1, env.storageName, env.partitionID, logStore, logger, env.metrics) + recoveryMgr, err := raftFactory(1, env.storageName, NewPartitionKey(env.storageName, env.partitionID), logStore, logger, env.metrics) require.NoError(t, err) // Initialize with the last known LSN @@ -1663,7 +1663,7 @@ func TestReplica_StorageConnection(t *testing.T) { // Create factory that connects replicas to storage raftFactory := DefaultFactoryWithNode(cfg.Raft, raftNode) - replica, err := raftFactory(1, storageName, partitionID, logStore, logger, NewMetrics()) + replica, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, replica.Close()) }) @@ -1683,10 +1683,7 @@ func TestReplica_StorageConnection(t *testing.T) { registry := raftNodeStorage.GetReplicaRegistry() require.NotNil(t, registry) - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(partitionID), - AuthorityName: storageName, - } + partitionKey := NewPartitionKey(storageName, partitionID) registeredReplica, err := registry.GetReplica(partitionKey) require.NoError(t, err) @@ -1697,14 +1694,14 @@ func TestReplica_StorageConnection(t *testing.T) { }) t.Run("multiple replicas for same partition key", func(t *testing.T) { - duplicateReplica, err := raftFactory(1, storageName, partitionID, logStore, logger, NewMetrics()) + duplicateReplica, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, duplicateReplica) }) t.Run("Register different replicas for different partition keys", func(t *testing.T) { partitionID := storage.PartitionID(2) - replicaTwo, err := raftFactory(1, storageName, partitionID, logStore, logger, NewMetrics()) + replicaTwo, err := raftFactory(1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, replicaTwo) }) @@ -1726,10 +1723,7 @@ func TestReplica_AddNode(t *testing.T) { replica, err := createRaftReplica(t, ctx, memberID, socketPath, raftCfg, partitionID, metrics, opts...) require.NoError(t, err) - partitionKey := &gitalypb.PartitionKey{ - PartitionId: uint64(partitionID), - AuthorityName: cfg.Storages[0].Name, - } + partitionKey := NewPartitionKey(cfg.Storages[0].Name, partitionID) registry.RegisterReplica(partitionKey, replica) transport.registry = registry @@ -1793,10 +1787,7 @@ func TestReplica_AddNode(t *testing.T) { require.NotNil(t, raftEnabledStorage, "storage should be a RaftEnabledStorage") routingTable := raftEnabledStorage.GetRoutingTable() - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(partitionID), - } + partitionKey := replica.partitionKey // Create second node replicaTwo, socketPathTwo, srvTwo := createTestNode(t, ctx, 3, partitionID, raftCfg, metrics) @@ -1880,10 +1871,7 @@ func TestReplica_AddNode(t *testing.T) { require.NotNil(t, raftEnabledStorage, "storage should be a RaftEnabledStorage") routingTable := raftEnabledStorage.GetRoutingTable() - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: replica.authorityName, - PartitionId: uint64(partitionID), - } + partitionKey := replica.partitionKey var destinationAddresses []string var servers []*grpc.Server diff --git a/internal/gitaly/storage/raftmgr/routing_table.go b/internal/gitaly/storage/raftmgr/routing_table.go index 00628c9c60a01f8fb7b1a44a7fb960833bda597e..6cbb7a20e20d565cc4c655cb42e37306035ddc46 100644 --- a/internal/gitaly/storage/raftmgr/routing_table.go +++ b/internal/gitaly/storage/raftmgr/routing_table.go @@ -13,8 +13,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) -func routingKey(partitionKey *gitalypb.PartitionKey) []byte { - return []byte(fmt.Sprintf("/raft/%s/%d", partitionKey.GetAuthorityName(), partitionKey.GetPartitionId())) +func routingKey(partitionKey *gitalypb.RaftPartitionKey) []byte { + return []byte(fmt.Sprintf("raft/%s", partitionKey.GetValue())) } // RoutingTableEntry represents a Raft cluster's routing state for a partition. @@ -35,10 +35,10 @@ type ReplicaMetadata struct { // RoutingTable handles translation between member IDs and addresses type RoutingTable interface { - Translate(partitionKey *gitalypb.PartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) - GetEntry(partitionKey *gitalypb.PartitionKey) (*RoutingTableEntry, error) + Translate(partitionKey *gitalypb.RaftPartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) + GetEntry(partitionKey *gitalypb.RaftPartitionKey) (*RoutingTableEntry, error) UpsertEntry(entry RoutingTableEntry) error - ApplyReplicaConfChange(partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error + ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error } // PersistentRoutingTable implements the RoutingTable interface with KV storage @@ -50,7 +50,7 @@ type kvRoutingTable struct { // NewKVRoutingTable creates a new key-value based routing table implementation // that persists routing information using badgerDB. func NewKVRoutingTable(kvStore keyvalue.Store) *kvRoutingTable { - prefix := []byte(fmt.Sprintf("p/%d", storagemgr.MetadataPartitionID)) + prefix := storagemgr.KeyPrefixPartition(storagemgr.MetadataPartitionID) prefixedStore := keyvalue.NewPrefixedTransactioner(kvStore, prefix) return &kvRoutingTable{ kvStore: prefixedStore, @@ -108,7 +108,7 @@ func (r *kvRoutingTable) UpsertEntry(entry RoutingTableEntry) error { } // GetEntry retrieves a routing table entry -func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.PartitionKey) (*RoutingTableEntry, error) { +func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.RaftPartitionKey) (*RoutingTableEntry, error) { key := routingKey(partitionKey) var entry RoutingTableEntry @@ -129,7 +129,7 @@ func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.PartitionKey) (*Routing } // Translate returns the storage name and address for a given partition key and member ID -func (r *kvRoutingTable) Translate(partitionKey *gitalypb.PartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) { +func (r *kvRoutingTable) Translate(partitionKey *gitalypb.RaftPartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) { entry, err := r.GetEntry(partitionKey) if err != nil { return nil, fmt.Errorf("get entry: %w", err) @@ -144,7 +144,7 @@ func (r *kvRoutingTable) Translate(partitionKey *gitalypb.PartitionKey, memberID return nil, fmt.Errorf("no address found for memberID %d", memberID) } -func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error { +func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error { routingTableEntry, err := r.GetEntry(partitionKey) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return fmt.Errorf("getting routing table entry: %w", err) @@ -160,7 +160,6 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.Partition routingTableEntry.Term = changes.Term() routingTableEntry.Index = changes.Index() - authorityName := partitionKey.GetAuthorityName() metadata := changes.Metadata() for _, confChange := range changes.Changes() { @@ -179,7 +178,7 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.Partition replica := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, - StorageName: authorityName, + StorageName: storageName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_VOTER, } @@ -199,7 +198,7 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.Partition learner := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, - StorageName: authorityName, + StorageName: storageName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_LEARNER, } diff --git a/internal/gitaly/storage/raftmgr/routing_table_test.go b/internal/gitaly/storage/raftmgr/routing_table_test.go index 658030183ee3d4518050795d279600f197d5c01c..890c181cd8ab74c82392ee26744a21634e8ac7b0 100644 --- a/internal/gitaly/storage/raftmgr/routing_table_test.go +++ b/internal/gitaly/storage/raftmgr/routing_table_test.go @@ -24,10 +24,7 @@ func TestPersistentRoutingTable(t *testing.T) { t.Run("add and translate member", func(t *testing.T) { memberID := 1 address := "localhost:1234" - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) entry := RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -54,10 +51,7 @@ func TestPersistentRoutingTable(t *testing.T) { }) t.Run("stale entry rejected", func(t *testing.T) { - key := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 2, - } + key := NewPartitionKey("test-authority", 2) entry1 := RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -83,10 +77,7 @@ func TestPersistentRoutingTable(t *testing.T) { }) t.Run("node not found", func(t *testing.T) { - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 3, - } + partitionKey := NewPartitionKey("test-authority", 3) memberID := 999 // Non-existent node @@ -116,15 +107,12 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -148,10 +136,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) initialEntry := &RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -181,7 +166,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(2, 2, 1, 1, nil) changes.AddChange(2, ConfChangeRemoveNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -206,15 +191,12 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddLearnerNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -236,15 +218,12 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) changes := NewReplicaConfChanges(1, 1, 0, 1, createMetadata("localhost:1234")) changes.AddChange(0, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID should be non-zero") }) @@ -261,23 +240,20 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) // First add a node changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) // Try to add the same node ID again changes = NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID 1 already exists") }) @@ -294,23 +270,20 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) // Add a node with ID 1 changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) // Try to update a non-existent node with ID 2 changes = NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) changes.AddChange(2, ConfChangeUpdateNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID 2 not found for update") }) @@ -327,10 +300,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) entry := &RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -351,7 +321,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(entry.Term, entry.Index, 1, 1, nil) changes.AddChange(1, ConfChangeRemoveNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "no replicas to upsert") }) @@ -368,10 +338,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) initialEntry := &RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -393,7 +360,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes := NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) changes.AddChange(1, ConfChangeUpdateNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -416,10 +383,7 @@ func TestApplyReplicaConfChange(t *testing.T) { rt := NewKVRoutingTable(kvStore) - partitionKey := &gitalypb.PartitionKey{ - AuthorityName: "test-authority", - PartitionId: 1, - } + partitionKey := NewPartitionKey("test-authority", 1) initialEntry := &RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{ @@ -442,7 +406,7 @@ func TestApplyReplicaConfChange(t *testing.T) { changes.AddChange(1, ConfChangeRemoveNode) changes.AddChange(2, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange(partitionKey, changes) + err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index 8d28357ae356624f1fcbbcb8865c437ae94b7f94..37a513c534c8c7417a54f061834792abf94c90db 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -141,7 +141,7 @@ func createRaftReplicaWithConfig(t *testing.T, ctx context.Context, raftCfg conf } raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, config.Options...) - return raftFactory(config.MemberID, storageName, config.PartitionID, logStore, logger, metrics) + return raftFactory(config.MemberID, storageName, NewPartitionKey(storageName, config.PartitionID), logStore, logger, metrics) } func createTempServer(t *testing.T, transport *GrpcTransport) (string, *grpc.Server) { diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index 6adca834a2f3650479ac83caa95a7144946719c2..c5ac7db2c2c99c057774f05d5d2de3c0848d61e8 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -64,6 +64,13 @@ func (f Factory) New( if f.raftCfg.Enabled { factory := f.raftFactory + // TODO: The PartitionKey will be retrieved from the incoming context when a Raft + // replica is booted up. This allows us to maintain a persistent key across + // all replicas of the partition. + // storageName will then represent the target storage where we intend on + // placing the new replicated partition. + partitionKey := raftmgr.NewPartitionKey(storageName, partitionID) + absoluteStateDir = getRaftPartitionPath(storageName, partitionID, absoluteStateDir) replicaLogStore, err := raftmgr.NewReplicaLogStore( @@ -84,7 +91,7 @@ func (f Factory) New( raftReplica, err := factory( 1, storageName, - partitionID, + partitionKey, replicaLogStore, logger, f.partitionMetrics.raft, diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index b4272213378d91884093b2f31f8181ff6e76940f..b30556f88d6ed982444d95fc39cca98b8150e99e 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -43,6 +43,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" logrus "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -1733,7 +1734,8 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas for _, key := range raftmgr.RaftDBKeys { tc.expectedState.Database[string(key)] = &anypb.Any{} } - peerKey := fmt.Sprintf("p/1/raft/%s/%d", storageName, setup.PartitionID) + partitionPrefix := storagemgr.KeyPrefixPartition(storagemgr.MetadataPartitionID) + peerKey := fmt.Sprintf("%sraft/%s", partitionPrefix, raftmgr.NewPartitionKey(storageName, setup.PartitionID).GetValue()) if _, ok := tc.expectedState.Database[peerKey]; !ok { tc.expectedState.Database[peerKey] = &anypb.Any{} } diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index b0d0e1dbf8615ff7b26231a3e017900183d084bb..0f6c6a9ad5e1d43cc78bba6d5194fb844c97699c 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -302,7 +302,8 @@ func clearStagingDirectory(stagingDir string) error { return nil } -func keyPrefixPartition(ptnID storage.PartitionID) []byte { +// KeyPrefixPartition returns the prefix to be used for KV-store entries scoped to ptnID. +func KeyPrefixPartition(ptnID storage.PartitionID) []byte { return []byte(fmt.Sprintf("%s%s/", PrefixPartition, ptnID.MarshalBinary())) } @@ -540,7 +541,7 @@ func (sm *StorageManager) startPartition(ctx context.Context, partitionID storag mgr := sm.partitionFactory.New( logger, partitionID, - keyvalue.NewPrefixedTransactioner(sm.database, keyPrefixPartition(partitionID)), + keyvalue.NewPrefixedTransactioner(sm.database, KeyPrefixPartition(partitionID)), sm.name, sm.path, absoluteStateDir, @@ -688,7 +689,7 @@ func (sm *StorageManager) ListPartitions(partitionID storage.PartitionID) (stora pi := &partitionIterator{ txn: txn, it: iter, - seek: keyPrefixPartition(partitionID), + seek: KeyPrefixPartition(partitionID), } return pi, nil diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go index 489443ba06b50b365a00bc881220eecb48b01f06..c7afae49aeaa272b81763050a303be8f4a42be83 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go @@ -1513,8 +1513,8 @@ func TestStorageManager_ListPartitions(t *testing.T) { // Creating multiple partition keys with duplicates require.NoError(t, storageMgr.database.Update(func(tx keyvalue.ReadWriter) error { for i := initialPartitionID; i < 4; i++ { - require.NoError(t, tx.Set(append(keyPrefixPartition(storage.PartitionID(i)), []byte("key1")...), nil)) - require.NoError(t, tx.Set(append(keyPrefixPartition(storage.PartitionID(i)), []byte("key2")...), nil)) + require.NoError(t, tx.Set(append(KeyPrefixPartition(storage.PartitionID(i)), []byte("key1")...), nil)) + require.NoError(t, tx.Set(append(KeyPrefixPartition(storage.PartitionID(i)), []byte("key2")...), nil)) } return nil })) diff --git a/proto/cluster.proto b/proto/cluster.proto index 3bc4f75ca3d3b2bdae93b39d695cf0a3b05d6f62..74dffe5c4f31f63964f1912dde5d6b7431b12ae4 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -36,24 +36,21 @@ message RaftEntry { LogData data = 2; } - -// PartitionKey identifies which partition this replica belongs to. -message PartitionKey { - // authority_name represents the storage that created the partition. - string authority_name = 1 [(gitaly.storage) = true]; - // partition_id is the local incrementing ID of a specific partition within a - // storage. Together with `authority_name`, this forms a unique identifier for - // a partition across the cluster. A partition belongs to a Raft group. - uint64 partition_id = 2; +// RaftPartitionKey is a globally-unique identifier for a replicated partition. +// The replica which minted the partition is responsible for computing the RaftPartitionKey, +// which is a hash of the storage name and partition ID. The key is then consumed +// as-is by other replicas wishing to store the partition. +message RaftPartitionKey { + // value is the SHA256 digest of the storage name and partition ID of the + // newly-minted partition. + string value = 1; } - - // ReplicaID uniquely identifies a replica in the Raft cluster. // It combines partition information with node-specific details. message ReplicaID { // partition_key identifies which partition this replica belongs to. - PartitionKey partition_key = 1; + RaftPartitionKey partition_key = 1; // member_id is the unique identifier assigned by etcd/raft. uint64 member_id = 2; // storage_name is the name of the storage where this replica is hosted. diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index cdd0b21433bdb926184117c20baf42ce925aeb8e..a6825dae943f43ed1177118e7743ef30a9085569 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -134,33 +134,33 @@ func (x *RaftEntry) GetData() *RaftEntry_LogData { return nil } -// PartitionKey identifies which partition this replica belongs to. -type PartitionKey struct { +// RaftPartitionKey is a globally-unique identifier for a replicated partition. +// The replica which minted the partition is responsible for computing the RaftPartitionKey, +// which is a hash of the storage name and partition ID. The key is then consumed +// as-is by other replicas wishing to store the partition. +type RaftPartitionKey struct { state protoimpl.MessageState `protogen:"open.v1"` - // authority_name represents the storage that created the partition. - AuthorityName string `protobuf:"bytes,1,opt,name=authority_name,json=authorityName,proto3" json:"authority_name,omitempty"` - // partition_id is the local incrementing ID of a specific partition within a - // storage. Together with `authority_name`, this forms a unique identifier for - // a partition across the cluster. A partition belongs to a Raft group. - PartitionId uint64 `protobuf:"varint,2,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + // value is the SHA256 digest of the storage name and partition ID of the + // newly-minted partition. + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } -func (x *PartitionKey) Reset() { - *x = PartitionKey{} +func (x *RaftPartitionKey) Reset() { + *x = RaftPartitionKey{} mi := &file_cluster_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *PartitionKey) String() string { +func (x *RaftPartitionKey) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PartitionKey) ProtoMessage() {} +func (*RaftPartitionKey) ProtoMessage() {} -func (x *PartitionKey) ProtoReflect() protoreflect.Message { +func (x *RaftPartitionKey) ProtoReflect() protoreflect.Message { mi := &file_cluster_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -172,31 +172,24 @@ func (x *PartitionKey) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PartitionKey.ProtoReflect.Descriptor instead. -func (*PartitionKey) Descriptor() ([]byte, []int) { +// Deprecated: Use RaftPartitionKey.ProtoReflect.Descriptor instead. +func (*RaftPartitionKey) Descriptor() ([]byte, []int) { return file_cluster_proto_rawDescGZIP(), []int{1} } -func (x *PartitionKey) GetAuthorityName() string { +func (x *RaftPartitionKey) GetValue() string { if x != nil { - return x.AuthorityName + return x.Value } return "" } -func (x *PartitionKey) GetPartitionId() uint64 { - if x != nil { - return x.PartitionId - } - return 0 -} - // ReplicaID uniquely identifies a replica in the Raft cluster. // It combines partition information with node-specific details. type ReplicaID struct { state protoimpl.MessageState `protogen:"open.v1"` // partition_key identifies which partition this replica belongs to. - PartitionKey *PartitionKey `protobuf:"bytes,1,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` + PartitionKey *RaftPartitionKey `protobuf:"bytes,1,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` // member_id is the unique identifier assigned by etcd/raft. MemberId uint64 `protobuf:"varint,2,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"` // storage_name is the name of the storage where this replica is hosted. @@ -239,7 +232,7 @@ func (*ReplicaID) Descriptor() ([]byte, []int) { return file_cluster_proto_rawDescGZIP(), []int{2} } -func (x *ReplicaID) GetPartitionKey() *PartitionKey { +func (x *ReplicaID) GetPartitionKey() *RaftPartitionKey { if x != nil { return x.PartitionKey } @@ -643,78 +636,75 @@ var file_cluster_proto_rawDesc = string([]byte{ 0x0a, 0x0a, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, - 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x5e, 0x0a, 0x0c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x2b, 0x0a, 0x0e, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, - 0x74, 0x79, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x04, 0x88, - 0xc6, 0x2c, 0x01, 0x52, 0x0d, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x4e, 0x61, - 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, - 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0xf6, 0x02, 0x0a, 0x09, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x49, 0x44, 0x12, 0x39, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, - 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1b, - 0x0a, 0x09, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x08, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, - 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x36, - 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x49, 0x44, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, - 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, - 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, 0x24, 0x0a, 0x08, 0x4d, 0x65, 0x74, - 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, - 0x5d, 0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1c, - 0x0a, 0x18, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, - 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, - 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x56, 0x4f, 0x54, - 0x45, 0x52, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, - 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, 0x45, 0x52, 0x10, 0x02, 0x22, 0x90, - 0x01, 0x0a, 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, - 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, - 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x70, 0x62, - 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x22, 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, 0x01, 0x0a, 0x1a, 0x52, 0x61, 0x66, - 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x72, 0x61, 0x66, 0x74, 0x5f, - 0x6d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x61, 0x66, 0x74, 0x4d, 0x73, 0x67, - 0x12, 0x16, 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, - 0x00, 0x52, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, 0x0a, 0x15, 0x72, 0x61, 0x66, 0x74, - 0x5f, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, - 0x64, 0x22, 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, - 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x73, - 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x6e, 0x61, 0x70, 0x73, - 0x68, 0x6f, 0x74, 0x53, 0x69, 0x7a, 0x65, 0x32, 0xc6, 0x01, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x63, 0x0a, 0x0c, 0x53, - 0x65, 0x6e, 0x64, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x22, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, - 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, - 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, - 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, - 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x28, 0x0a, 0x10, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, + 0xfa, 0x02, 0x0a, 0x09, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x12, 0x3d, 0x0a, + 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, + 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, + 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1b, 0x0a, 0x09, + 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x08, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x6f, + 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x08, + 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, + 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, + 0x44, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, + 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, 0x24, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x5d, 0x0a, + 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1c, 0x0a, 0x18, + 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, + 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, + 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x56, 0x4f, 0x54, 0x45, 0x52, + 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, 0x45, 0x52, 0x10, 0x02, 0x22, 0x90, 0x01, 0x0a, + 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x70, 0x62, 0x2e, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, + 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, 0x01, 0x0a, 0x1a, 0x52, 0x61, 0x66, 0x74, 0x53, + 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6d, 0x73, + 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x61, 0x66, 0x74, 0x4d, 0x73, 0x67, 0x12, 0x16, + 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, + 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, 0x0a, 0x15, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x73, + 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, + 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x20, + 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x73, 0x69, 0x7a, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, + 0x74, 0x53, 0x69, 0x7a, 0x65, 0x32, 0xc6, 0x01, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, + 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, + 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x63, 0x0a, 0x0c, 0x53, 0x65, 0x6e, + 0x64, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, + 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x42, 0x34, + 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, + 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, + 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( @@ -734,7 +724,7 @@ var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_cluster_proto_goTypes = []any{ (ReplicaID_ReplicaType)(0), // 0: gitaly.ReplicaID.ReplicaType (*RaftEntry)(nil), // 1: gitaly.RaftEntry - (*PartitionKey)(nil), // 2: gitaly.PartitionKey + (*RaftPartitionKey)(nil), // 2: gitaly.RaftPartitionKey (*ReplicaID)(nil), // 3: gitaly.ReplicaID (*RaftMessageRequest)(nil), // 4: gitaly.RaftMessageRequest (*RaftMessageResponse)(nil), // 5: gitaly.RaftMessageResponse @@ -746,7 +736,7 @@ var file_cluster_proto_goTypes = []any{ } var file_cluster_proto_depIdxs = []int32{ 8, // 0: gitaly.RaftEntry.data:type_name -> gitaly.RaftEntry.LogData - 2, // 1: gitaly.ReplicaID.partition_key:type_name -> gitaly.PartitionKey + 2, // 1: gitaly.ReplicaID.partition_key:type_name -> gitaly.RaftPartitionKey 9, // 2: gitaly.ReplicaID.metadata:type_name -> gitaly.ReplicaID.Metadata 0, // 3: gitaly.ReplicaID.type:type_name -> gitaly.ReplicaID.ReplicaType 3, // 4: gitaly.RaftMessageRequest.replica_id:type_name -> gitaly.ReplicaID