From cda6966eb93eb084f42141e6cd95181295528ad1 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 12 Aug 2025 22:22:01 +0530 Subject: [PATCH 1/8] raft: Add relative_path field in ReplicaID Add a relative_path field to the ReplicaID message type to enable repository identification during Raft operations. This field will be used to bootstrap replicas. --- proto/cluster.proto | 3 + proto/go/gitalypb/cluster.pb.go | 373 ++++++++++++++++---------------- 2 files changed, 195 insertions(+), 181 deletions(-) diff --git a/proto/cluster.proto b/proto/cluster.proto index 1af29fc1302..5fc78060780 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -59,6 +59,9 @@ message ReplicaID { message Metadata { // address is the network address of the replica. string address = 1; + // relative_path of the repository + string relative_path = 2; + } // metadata contains replica information. Metadata metadata = 4; diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index 63c345ac4b3..4117391f27b 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -1122,7 +1122,9 @@ func (x *RaftEntry_LogData) GetPacked() []byte { type ReplicaID_Metadata struct { state protoimpl.MessageState `protogen:"open.v1"` // address is the network address of the replica. - Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + // relative_path of the repository + RelativePath string `protobuf:"bytes,2,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1164,6 +1166,13 @@ func (x *ReplicaID_Metadata) GetAddress() string { return "" } +func (x *ReplicaID_Metadata) GetRelativePath() string { + if x != nil { + return x.RelativePath + } + return "" +} + // StorageStats contains statistics for a specific storage/authority. type ClusterStatistics_StorageStats struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -1332,7 +1341,7 @@ var file_cluster_proto_rawDesc = string([]byte{ 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x28, 0x0a, 0x10, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, - 0xfa, 0x02, 0x0a, 0x09, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x12, 0x3d, 0x0a, + 0x9f, 0x03, 0x0a, 0x09, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, @@ -1347,188 +1356,190 @@ var file_cluster_proto_rawDesc = string([]byte{ 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, - 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, 0x24, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, 0x49, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x5d, 0x0a, - 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1c, 0x0a, 0x18, - 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, - 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, - 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x56, 0x4f, 0x54, 0x45, 0x52, - 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, - 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, 0x45, 0x52, 0x10, 0x02, 0x22, 0x90, 0x01, 0x0a, - 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, - 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x70, 0x62, 0x2e, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, - 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, 0x01, 0x0a, 0x1a, 0x52, 0x61, 0x66, 0x74, 0x53, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a, + 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, + 0x74, 0x68, 0x22, 0x5d, 0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x1c, 0x0a, 0x18, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, + 0x16, 0x0a, 0x12, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, + 0x56, 0x4f, 0x54, 0x45, 0x52, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x50, 0x4c, 0x49, + 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, 0x45, 0x52, 0x10, + 0x02, 0x22, 0x90, 0x01, 0x0a, 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x72, 0x61, 0x66, + 0x74, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, 0x01, 0x0a, 0x1a, + 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x72, 0x61, + 0x66, 0x74, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x61, 0x66, 0x74, + 0x4d, 0x73, 0x67, 0x12, 0x16, 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0c, 0x48, 0x00, 0x52, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, 0x0a, 0x15, 0x72, + 0x61, 0x66, 0x74, 0x5f, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, + 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, + 0x74, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x6e, + 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x53, 0x69, 0x7a, 0x65, 0x22, 0xb4, 0x02, 0x0a, 0x12, 0x4a, + 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, + 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, + 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, + 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x08, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, + 0x09, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x08, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, + 0x72, 0x6d, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x14, + 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x69, + 0x6e, 0x64, 0x65, 0x78, 0x12, 0x27, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x42, 0x04, 0x88, 0xc6, 0x2c, 0x01, + 0x52, 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, + 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x07, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, + 0x74, 0x68, 0x12, 0x2d, 0x0a, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x08, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, + 0x73, 0x22, 0x15, 0x0a, 0x13, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xa1, 0x02, 0x0a, 0x14, 0x47, 0x65, 0x74, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, + 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, + 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, + 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, + 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, + 0x50, 0x61, 0x74, 0x68, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x36, + 0x0a, 0x17, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x5f, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x15, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x44, + 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x12, 0x34, 0x0a, 0x16, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, + 0x65, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x14, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x52, + 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, 0x22, 0xd2, 0x03, 0x0a, + 0x11, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, + 0x63, 0x73, 0x12, 0x29, 0x0a, 0x10, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x74, 0x6f, + 0x74, 0x61, 0x6c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2d, 0x0a, + 0x12, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x68, 0x65, 0x61, 0x6c, 0x74, + 0x68, 0x79, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x25, 0x0a, 0x0e, + 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0d, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x73, 0x12, 0x29, 0x0a, 0x10, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x72, + 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x68, + 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x50, + 0x0a, 0x0d, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, + 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, + 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x52, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, + 0x1a, 0x56, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, + 0x12, 0x21, 0x0a, 0x0c, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x43, 0x6f, + 0x75, 0x6e, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x63, + 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x72, 0x65, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x67, 0x0a, 0x11, 0x53, 0x74, 0x6f, 0x72, + 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x3c, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, + 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, + 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, + 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x22, 0xa7, 0x04, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x63, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x47, 0x0a, 0x08, 0x72, 0x65, 0x70, + 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, + 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, + 0x65, 0x72, 0x6d, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, + 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x25, + 0x0a, 0x0e, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, + 0x18, 0x08, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, + 0x50, 0x61, 0x74, 0x68, 0x73, 0x1a, 0xd3, 0x01, 0x0a, 0x0d, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, + 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, + 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x0a, 0x69, 0x73, 0x5f, 0x68, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x69, 0x73, 0x48, 0x65, + 0x61, 0x6c, 0x74, 0x68, 0x79, 0x12, 0x1d, 0x0a, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x69, 0x6e, + 0x64, 0x65, 0x78, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6c, 0x61, 0x73, 0x74, 0x49, + 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x6e, + 0x64, 0x65, 0x78, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x6d, 0x61, 0x74, 0x63, 0x68, + 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x37, 0x0a, 0x16, 0x52, + 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x49, 0x64, 0x22, 0x73, 0x0a, 0x17, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x39, + 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x0a, 0x73, + 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x32, 0xcf, 0x03, 0x0a, 0x0b, 0x52, 0x61, + 0x66, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, 0x0b, 0x53, 0x65, 0x6e, + 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, + 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x63, 0x0a, + 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x22, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, + 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6d, 0x73, - 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x61, 0x66, 0x74, 0x4d, 0x73, 0x67, 0x12, 0x16, - 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, - 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, 0x0a, 0x15, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x73, - 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, - 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x20, - 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x73, 0x69, 0x7a, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, - 0x74, 0x53, 0x69, 0x7a, 0x65, 0x22, 0xb4, 0x02, 0x0a, 0x12, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3d, 0x0a, 0x0d, - 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, - 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x6c, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, - 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x6d, 0x65, 0x6d, 0x62, - 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6d, 0x65, 0x6d, - 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, - 0x65, 0x78, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, - 0x27, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, - 0x06, 0x20, 0x01, 0x28, 0x09, 0x42, 0x04, 0x88, 0xc6, 0x2c, 0x01, 0x52, 0x0b, 0x73, 0x74, 0x6f, - 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, - 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x2d, 0x0a, - 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x49, 0x44, 0x52, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x22, 0x15, 0x0a, 0x13, - 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0xa1, 0x02, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, - 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0d, 0x70, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, - 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, - 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, - 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x36, 0x0a, 0x17, 0x69, 0x6e, 0x63, - 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x64, 0x65, 0x74, - 0x61, 0x69, 0x6c, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, 0x69, 0x6e, 0x63, 0x6c, - 0x75, 0x64, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, - 0x73, 0x12, 0x34, 0x0a, 0x16, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x72, 0x65, 0x6c, - 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x14, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x52, 0x65, 0x6c, 0x61, 0x74, 0x69, - 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, 0x22, 0xd2, 0x03, 0x0a, 0x11, 0x43, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x12, 0x29, 0x0a, - 0x10, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2d, 0x0a, 0x12, 0x68, 0x65, 0x61, 0x6c, - 0x74, 0x68, 0x79, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x74, 0x6f, 0x74, 0x61, 0x6c, - 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x0d, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x29, - 0x0a, 0x10, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x79, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x50, 0x0a, 0x0d, 0x73, 0x74, 0x6f, - 0x72, 0x61, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x2b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x72, - 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0c, 0x73, - 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x1a, 0x56, 0x0a, 0x0c, 0x53, - 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x6c, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x0b, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x23, - 0x0a, 0x0d, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x43, 0x6f, - 0x75, 0x6e, 0x74, 0x1a, 0x67, 0x0a, 0x11, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, - 0x61, 0x74, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x3c, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, - 0x74, 0x69, 0x63, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, - 0x73, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xa7, 0x04, 0x0a, - 0x15, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x47, 0x0a, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, - 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x1b, 0x0a, - 0x09, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, - 0x52, 0x08, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, - 0x72, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x14, - 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x69, - 0x6e, 0x64, 0x65, 0x78, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, - 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, - 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x6c, - 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, - 0x09, 0x52, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, - 0x1a, 0xd3, 0x01, 0x0a, 0x0d, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, - 0x63, 0x61, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, 0x6c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x12, 0x1d, 0x0a, 0x0a, 0x69, 0x73, 0x5f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x69, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, - 0x12, 0x1d, 0x0a, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6c, 0x61, 0x73, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, - 0x1f, 0x0a, 0x0b, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x64, 0x65, 0x78, - 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x37, 0x0a, 0x16, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, + 0x28, 0x01, 0x12, 0x50, 0x0a, 0x0b, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, + 0x08, 0x01, 0x10, 0x02, 0x12, 0x58, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, + 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x02, 0x10, 0x02, 0x30, 0x01, 0x12, 0x5b, + 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, + 0x12, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x22, - 0x73, 0x0a, 0x17, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, - 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, - 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, - 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, - 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, - 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x69, 0x73, - 0x74, 0x69, 0x63, 0x73, 0x32, 0xcf, 0x03, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, - 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, - 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x63, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, - 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, - 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x50, 0x0a, - 0x0b, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x12, - 0x58, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x12, 0x1c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, - 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, - 0x97, 0x28, 0x04, 0x08, 0x02, 0x10, 0x02, 0x30, 0x01, 0x12, 0x5b, 0x0a, 0x0e, 0x47, 0x65, 0x74, - 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1e, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, - 0x28, 0x04, 0x08, 0x02, 0x10, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, - 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x38, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x1a, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x02, 0x10, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, + 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, + 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x38, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( -- GitLab From 11b1bcd8ded32a24bd24cdc28dfe2d139f27263f Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 12 Aug 2025 22:27:08 +0530 Subject: [PATCH 2/8] raft: Add support to pass partitionInfo through context To support dynamic replica creation we need to pass down the original partitionInfo received from the leader through context. --- internal/gitaly/storage/context.go | 30 +++++++++++++++++++ .../storage/storagemgr/partition_manager.go | 1 - .../storagemgr/partition_manager_test.go | 3 ++ 3 files changed, 33 insertions(+), 1 deletion(-) diff --git a/internal/gitaly/storage/context.go b/internal/gitaly/storage/context.go index dc076310c9d..91786820e75 100644 --- a/internal/gitaly/storage/context.go +++ b/internal/gitaly/storage/context.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" "google.golang.org/grpc/metadata" ) @@ -47,6 +48,35 @@ func ExtractTransaction(ctx context.Context) Transaction { return value.(Transaction) } +type keyPartitionInfo struct{} + +type partitionInfo struct { + PartitionKey *gitalypb.RaftPartitionKey + MemberID uint64 + RelativePath string +} + +// ContextWithPartitionInfo stores the partition info into the context. +// This is used to pass the original partition info to the partition factory. +func ContextWithPartitionInfo(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, memberID uint64, relativePath string) context.Context { + return context.WithValue(ctx, keyPartitionInfo{}, partitionInfo{ + PartitionKey: partitionKey, + MemberID: memberID, + RelativePath: relativePath, + }) +} + +// ExtractPartitionInfo extracts the partition info from the context. Nil is returned if there's +// no partition info in the context. +func ExtractPartitionInfo(ctx context.Context) partitionInfo { + value := ctx.Value(keyPartitionInfo{}) + if value == nil { + return partitionInfo{} + } + + return value.(partitionInfo) +} + const keyPartitioningHint = "gitaly-partitioning-hint" // ContextWithPartitioningHint stores the relativePath as a partitioning hint into the incoming diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index 7854becb848..1e130be50a4 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -541,7 +541,6 @@ func (sm *StorageManager) startPartition(ctx context.Context, partitionID storag } logger := sm.logger.WithField("partition_id", partitionID) - mgr := sm.partitionFactory.New( ctx, logger, diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go index ebdbc9d8d9e..1fe4af2706a 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go @@ -32,6 +32,7 @@ import ( type mockPartitionFactory struct { new func( + ctx context.Context, logger log.Logger, partitionID storage.PartitionID, db keyvalue.Transactioner, @@ -47,6 +48,7 @@ type mockPartitionFactory struct { func newStubPartitionFactory() PartitionFactory { return mockPartitionFactory{ new: func( + ctx context.Context, logger log.Logger, partitionID storage.PartitionID, db keyvalue.Transactioner, @@ -129,6 +131,7 @@ func (m mockPartitionFactory) New( stagingDir string, ) Partition { return m.new( + ctx, logger, partitionID, db, -- GitLab From 95f130700a4bbd2ddf1553988adc127765040653 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 12 Aug 2025 22:59:54 +0530 Subject: [PATCH 3/8] raft: Pass destination storage name to cluster membership methods During bootstrapping, we need a way to determine the specific storage on the destination node where the replica will be created. --- internal/gitaly/storage/raftmgr/replica.go | 27 +++++---- .../storage/raftmgr/replica_conf_change.go | 58 ++++++++++-------- .../raftmgr/replica_conf_change_test.go | 23 +++++--- .../gitaly/storage/raftmgr/replica_test.go | 18 +++--- .../gitaly/storage/raftmgr/routing_table.go | 17 ++---- .../storage/raftmgr/routing_table_test.go | 59 ++++++++++--------- 6 files changed, 110 insertions(+), 92 deletions(-) diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index 68ae6faaed4..d50f1deb650 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -56,13 +56,13 @@ type RaftReplica interface { // RemoveNode removes a node from the Raft cluster. // This operation can only be performed by the leader. - RemoveNode(ctx context.Context, memberID uint64) error + RemoveNode(ctx context.Context, memberID uint64, destinationStorageName string) error // AddLearner adds a new non-voting learner node to the Raft cluster. // Learner nodes receive log entries but don't participate in elections. // This is typically used to bring new nodes up to speed before promoting them to voters. // This operation can only be performed by the leader. - AddLearner(ctx context.Context, address string) error + AddLearner(ctx context.Context, address, destinationStorageName string) error // GetCurrentState returns comprehensive current state information including term, index, and Raft state. // This provides an efficient way to get multiple state values with consistent locking. @@ -929,7 +929,7 @@ func (replica *Replica) processConfChange(entry raftpb.Entry) error { routingTable := replica.raftEnabledStorage.GetRoutingTable() // Apply the changes to the routing table - if err := routingTable.ApplyReplicaConfChange(replica.logStore.storageName, replica.partitionKey, replicaChanges); err != nil { + if err := routingTable.ApplyReplicaConfChange(replica.partitionKey, replicaChanges); err != nil { return fmt.Errorf("applying conf changes: %w", err) } @@ -1031,19 +1031,20 @@ func (replica *Replica) Step(ctx context.Context, msg raftpb.Message) error { func (replica *Replica) AddNode(ctx context.Context, address, destinationStorageName string) error { memberID := uint64(replica.AppendedLSN() + 1) return replica.proposeMembershipChange(ctx, string(addVoter), destinationStorageName, memberID, ConfChangeAddNode, &gitalypb.ReplicaID_Metadata{ - Address: address, + Address: address, + RelativePath: replica.relativePath, }) } // RemoveNode implements RaftReplica.RemoveNode -func (replica *Replica) RemoveNode(ctx context.Context, memberID uint64) error { - return replica.proposeMembershipChange(ctx, string(removeNode), "", memberID, ConfChangeRemoveNode, nil) +func (replica *Replica) RemoveNode(ctx context.Context, memberID uint64, destinationStorageName string) error { + return replica.proposeMembershipChange(ctx, string(removeNode), destinationStorageName, memberID, ConfChangeRemoveNode, nil) } // AddLearner implements RaftReplica.AddLearner -func (replica *Replica) AddLearner(ctx context.Context, address string) error { +func (replica *Replica) AddLearner(ctx context.Context, address, destinationStorageName string) error { memberID := uint64(replica.AppendedLSN() + 1) - return replica.proposeMembershipChange(ctx, string(addLearner), "", memberID, ConfChangeAddLearnerNode, &gitalypb.ReplicaID_Metadata{ + return replica.proposeMembershipChange(ctx, string(addLearner), destinationStorageName, memberID, ConfChangeAddLearnerNode, &gitalypb.ReplicaID_Metadata{ Address: address, }) } @@ -1086,12 +1087,12 @@ func (replica *Replica) proposeRemoveNode(ctx context.Context, changeType string return err } - return replica.proposeConfChange(ctx, changeType, memberID, ConfChangeRemoveNode, nil) + return replica.proposeConfChange(ctx, changeType, memberID, "", ConfChangeRemoveNode, nil) } func (replica *Replica) proposeAddNode(ctx context.Context, changeType, destinationStorageName string, memberID uint64, metadata *gitalypb.ReplicaID_Metadata) (returnedErr error) { // First, propose the configuration change - if err := replica.proposeConfChange(ctx, changeType, memberID, ConfChangeAddNode, metadata); err != nil { + if err := replica.proposeConfChange(ctx, changeType, memberID, destinationStorageName, ConfChangeAddNode, metadata); err != nil { return err } @@ -1100,7 +1101,7 @@ func (replica *Replica) proposeAddNode(ctx context.Context, changeType, destinat // If join fails, attempt to remove the node from the cluster replica.logger.WithError(err).Warn("join cluster failed, attempting to remove node") - if removeErr := replica.proposeConfChange(ctx, string(removeNode), memberID, ConfChangeRemoveNode, nil); removeErr != nil { + if removeErr := replica.proposeConfChange(ctx, string(removeNode), memberID, "", ConfChangeRemoveNode, nil); removeErr != nil { replica.logger.WithError(removeErr).Error("failed to remove node after join failure") } @@ -1111,13 +1112,14 @@ func (replica *Replica) proposeAddNode(ctx context.Context, changeType, destinat } func (replica *Replica) proposeAddLearner(ctx context.Context, changeType string, memberID uint64, metadata *gitalypb.ReplicaID_Metadata) error { - return replica.proposeConfChange(ctx, changeType, memberID, ConfChangeAddLearnerNode, metadata) + return replica.proposeConfChange(ctx, changeType, memberID, "", ConfChangeAddLearnerNode, metadata) } func (replica *Replica) proposeConfChange( ctx context.Context, changeType string, memberID uint64, + destinationStorageName string, confChangeType ConfChangeType, metadata *gitalypb.ReplicaID_Metadata, ) (returnedErr error) { @@ -1132,6 +1134,7 @@ func (replica *Replica) proposeConfChange( replica.node.Status().Term, uint64(replica.AppendedLSN()), replica.leadership.GetLeaderID(), + destinationStorageName, waiter.ID, metadata, ) diff --git a/internal/gitaly/storage/raftmgr/replica_conf_change.go b/internal/gitaly/storage/raftmgr/replica_conf_change.go index 354ecf19e21..07ac66cd251 100644 --- a/internal/gitaly/storage/raftmgr/replica_conf_change.go +++ b/internal/gitaly/storage/raftmgr/replica_conf_change.go @@ -29,18 +29,20 @@ type ReplicaConfChange struct { // a consistent interface for configuration changes regardless of the underlying // implementation (ConfChange or ConfChangeV2). type ReplicaConfChanges struct { - changes []ReplicaConfChange - metadata *gitalypb.ReplicaID_Metadata - term uint64 - index uint64 - leaderID uint64 - eventID EventID + changes []ReplicaConfChange + metadata *gitalypb.ReplicaID_Metadata + term uint64 + index uint64 + leaderID uint64 + destinationStorageName string + eventID EventID } // ConfChangeContext wraps both event ID and metadata for config changes type ConfChangeContext struct { - EventID EventID `json:"event_id,omitempty"` - Metadata *gitalypb.ReplicaID_Metadata `json:"metadata,omitempty"` + EventID EventID `json:"event_id,omitempty"` + Metadata *gitalypb.ReplicaID_Metadata `json:"metadata,omitempty"` + DestinationStorageName string `json:"destination_storage_name,omitempty"` } // NewReplicaConfChanges creates a new ReplicaConfChanges instance. @@ -48,16 +50,18 @@ func NewReplicaConfChanges( term uint64, index uint64, leaderID uint64, + destinationStorageName string, eventID EventID, metadata *gitalypb.ReplicaID_Metadata, ) *ReplicaConfChanges { return &ReplicaConfChanges{ - changes: make([]ReplicaConfChange, 0), - metadata: metadata, - term: term, - index: index, - leaderID: leaderID, - eventID: eventID, + changes: make([]ReplicaConfChange, 0), + metadata: metadata, + destinationStorageName: destinationStorageName, + term: term, + index: index, + leaderID: leaderID, + eventID: eventID, } } @@ -79,6 +83,11 @@ func (r *ReplicaConfChanges) Metadata() *gitalypb.ReplicaID_Metadata { return r.metadata } +// DestinationStorageName returns the destination storage name associated with the configuration changes. +func (r *ReplicaConfChanges) DestinationStorageName() string { + return r.destinationStorageName +} + // Term returns the term of the configuration changes. func (r *ReplicaConfChanges) Term() uint64 { return r.term @@ -144,8 +153,9 @@ func (r *ReplicaConfChanges) ToConfChangeV2() (raftpb.ConfChangeV2, error) { func (r *ReplicaConfChanges) encodeContext() ([]byte, error) { context := &ConfChangeContext{ - EventID: r.eventID, - Metadata: r.metadata, + EventID: r.eventID, + Metadata: r.metadata, + DestinationStorageName: r.destinationStorageName, } return json.Marshal(context) @@ -167,17 +177,17 @@ func parseChangeType(ccType raftpb.ConfChangeType) (ConfChangeType, error) { } } -func parseContext(context []byte) (EventID, *gitalypb.ReplicaID_Metadata, error) { +func parseContext(context []byte) (EventID, string, *gitalypb.ReplicaID_Metadata, error) { if len(context) == 0 { - return 0, nil, nil + return 0, "", nil, nil } var confChangeContext ConfChangeContext if err := json.Unmarshal(context, &confChangeContext); err != nil { - return 0, nil, fmt.Errorf("unmarshal context: %w", err) + return 0, "", nil, fmt.Errorf("unmarshal context: %w", err) } - return confChangeContext.EventID, confChangeContext.Metadata, nil + return confChangeContext.EventID, confChangeContext.DestinationStorageName, confChangeContext.Metadata, nil } // ParseConfChange parses a raftpb.Entry containing a configuration change directly into a ReplicaConfChanges. @@ -190,7 +200,7 @@ func ParseConfChange(entry raftpb.Entry, leaderID uint64) (*ReplicaConfChanges, return nil, fmt.Errorf("unmarshalling EntryConfChange: %w", err) } - eventID, metadata, err := parseContext(cc.Context) + eventID, destinationStorageName, metadata, err := parseContext(cc.Context) if err != nil { return nil, err } @@ -200,7 +210,7 @@ func ParseConfChange(entry raftpb.Entry, leaderID uint64) (*ReplicaConfChanges, return nil, err } - result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, eventID, metadata) + result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, destinationStorageName, eventID, metadata) result.AddChange(cc.NodeID, nodeType) return result, nil } else if entry.Type == raftpb.EntryConfChangeV2 { @@ -213,12 +223,12 @@ func ParseConfChange(entry raftpb.Entry, leaderID uint64) (*ReplicaConfChanges, return nil, fmt.Errorf("no changes in ConfChangeV2") } - eventID, metadata, err := parseContext(cc.Context) + eventID, destinationStorageName, metadata, err := parseContext(cc.Context) if err != nil { return nil, err } - result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, eventID, metadata) + result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, destinationStorageName, eventID, metadata) for _, change := range cc.Changes { nodeType, err := parseChangeType(change.Type) diff --git a/internal/gitaly/storage/raftmgr/replica_conf_change_test.go b/internal/gitaly/storage/raftmgr/replica_conf_change_test.go index eb85b085a6a..f2a65eecb7c 100644 --- a/internal/gitaly/storage/raftmgr/replica_conf_change_test.go +++ b/internal/gitaly/storage/raftmgr/replica_conf_change_test.go @@ -7,6 +7,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" ) +var destinationStorage = "test-storage" + func TestConfChangeContext_EncodeDecode(t *testing.T) { t.Parallel() @@ -15,7 +17,7 @@ func TestConfChangeContext_EncodeDecode(t *testing.T) { } t.Run("encode and decode with event ID and metadata", func(t *testing.T) { - changes := NewReplicaConfChanges(1, 2, 3, 12345, metadata) + changes := NewReplicaConfChanges(1, 2, 3, destinationStorage, 12345, metadata) // Encode contextBytes, err := changes.encodeContext() @@ -23,15 +25,16 @@ func TestConfChangeContext_EncodeDecode(t *testing.T) { require.NotEmpty(t, contextBytes) // Decode - eventID, decodedMetadata, err := parseContext(contextBytes) + eventID, actualDestinationStorageName, decodedMetadata, err := parseContext(contextBytes) require.NoError(t, err) require.Equal(t, EventID(12345), eventID) + require.Equal(t, destinationStorage, actualDestinationStorageName) require.NotNil(t, decodedMetadata) require.Equal(t, "unix:///tmp/test.socket", decodedMetadata.GetAddress()) }) t.Run("encode and decode with only event ID", func(t *testing.T) { - changes := NewReplicaConfChanges(1, 2, 3, 67890, nil) + changes := NewReplicaConfChanges(1, 2, 3, destinationStorage, 67890, nil) // Encode contextBytes, err := changes.encodeContext() @@ -39,35 +42,39 @@ func TestConfChangeContext_EncodeDecode(t *testing.T) { require.NotEmpty(t, contextBytes) // Decode - eventID, decodedMetadata, err := parseContext(contextBytes) + eventID, destinationStorageName, decodedMetadata, err := parseContext(contextBytes) require.NoError(t, err) require.Equal(t, EventID(67890), eventID) + require.Equal(t, destinationStorage, destinationStorageName) require.Nil(t, decodedMetadata) }) t.Run("encode and decode with only metadata", func(t *testing.T) { - changes := NewReplicaConfChanges(1, 2, 3, 0, metadata) + changes := NewReplicaConfChanges(1, 2, 3, destinationStorage, 0, metadata) contextBytes, err := changes.encodeContext() require.NoError(t, err) require.NotEmpty(t, contextBytes) - eventID, decodedMetadata, err := parseContext(contextBytes) + eventID, destinationStorageName, decodedMetadata, err := parseContext(contextBytes) require.NoError(t, err) require.Equal(t, EventID(0), eventID) + require.Equal(t, destinationStorage, destinationStorageName) require.NotNil(t, decodedMetadata) require.Equal(t, "unix:///tmp/test.socket", decodedMetadata.GetAddress()) }) t.Run("empty context", func(t *testing.T) { - eventID, metadata, err := parseContext(nil) + eventID, destinationStorageName, metadata, err := parseContext(nil) require.NoError(t, err) require.Equal(t, EventID(0), eventID) + require.Equal(t, "", destinationStorageName) require.Nil(t, metadata) - eventID, metadata, err = parseContext([]byte{}) + eventID, destinationStorageName, metadata, err = parseContext([]byte{}) require.NoError(t, err) require.Equal(t, EventID(0), eventID) + require.Equal(t, "", destinationStorageName) require.Nil(t, metadata) }) } diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index ac8cf18af53..600316e3b71 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -30,7 +30,7 @@ import ( ) const ( - waitTimeout = 5 * time.Second + waitTimeout = 10 * time.Second ) func raftConfigsForTest(t *testing.T) config.Raft { @@ -1743,11 +1743,11 @@ func TestReplica_AddNode(t *testing.T) { }, timeout, 5*time.Millisecond, "replica should become leader") } - // waitForVoters waits for both replicas to have the expected number of voters - waitForReplicaLeader := func(t *testing.T, replicaOne, replicaTwo *Replica, expectedVoters int, timeout time.Duration) { + // waitForReplicaLeader waits for both replicas to have the same leader + waitForReplicaLeader := func(t *testing.T, replicaOne, replicaTwo *Replica, timeout time.Duration) { require.Eventually(t, func() bool { return replicaTwo.leadership.GetLeaderID() == replicaOne.memberID - }, timeout, 5*time.Millisecond, "configuration should stabilize with %d voters", expectedVoters) + }, timeout, 5*time.Millisecond, "replica two should have the same leader as replica one") } drainNotificationQueues := func(t *testing.T, replicas ...*Replica) { @@ -1836,7 +1836,7 @@ func TestReplica_AddNode(t *testing.T) { drainNotificationQueues(t, replica, replicaTwo) - waitForReplicaLeader(t, replica, replicaTwo, 2, waitTimeout) + waitForReplicaLeader(t, replica, replicaTwo, waitTimeout) testhelper.RequirePromMetrics(t, metrics, ` # HELP gitaly_raft_log_entries_processed Rate of log entries processed. @@ -1977,7 +1977,7 @@ func TestReplica_AddNode(t *testing.T) { addedReplica := addressesToReplicas[addedAddress] // Wait for voters count to stabilize between both replicas - waitForReplicaLeader(t, replica, addedReplica, 2, waitTimeout) + waitForReplicaLeader(t, replica, addedReplica, waitTimeout) drainNotificationQueues(t, replica, addedReplica) @@ -2019,7 +2019,7 @@ func TestReplica_AddNode(t *testing.T) { // Wait for the replica to elect itself as leader waitForLeadership(t, replica, waitTimeout) - err := replica.RemoveNode(ctx, 999) + err := replica.RemoveNode(ctx, 999, storageName) require.EqualError(t, err, "translating member ID: no address found for memberID 999") }) @@ -2041,9 +2041,7 @@ func TestReplica_AddNode(t *testing.T) { // Set a random leader ID to simulate a non-leader replica.leadership.SetLeader(999, false) - destination := "default" - - err := replica.AddNode(ctx, "gitaly-node-2:8075", destination) + err := replica.AddNode(ctx, "gitaly-node-2:8075", storageName) require.EqualError(t, err, "replica is not the leader", "adding node should fail when not leader") testhelper.RequirePromMetrics(t, metrics, ` diff --git a/internal/gitaly/storage/raftmgr/routing_table.go b/internal/gitaly/storage/raftmgr/routing_table.go index 5fba4c93147..0ea3ed9d5c3 100644 --- a/internal/gitaly/storage/raftmgr/routing_table.go +++ b/internal/gitaly/storage/raftmgr/routing_table.go @@ -27,18 +27,12 @@ type RoutingTableEntry struct { Index uint64 } -// ReplicaMetadata contains additional information about a replica -// that is needed for routing messages. -type ReplicaMetadata struct { - Address string -} - // RoutingTable handles translation between member IDs and addresses type RoutingTable interface { Translate(partitionKey *gitalypb.RaftPartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) GetEntry(partitionKey *gitalypb.RaftPartitionKey) (*RoutingTableEntry, error) UpsertEntry(entry RoutingTableEntry) error - ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error + ApplyReplicaConfChange(partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error ListEntries() (map[string]*RoutingTableEntry, error) } @@ -145,7 +139,7 @@ func (r *kvRoutingTable) Translate(partitionKey *gitalypb.RaftPartitionKey, memb return nil, fmt.Errorf("no address found for memberID %d", memberID) } -func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error { +func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error { routingTableEntry, err := r.GetEntry(partitionKey) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return fmt.Errorf("getting routing table entry: %w", err) @@ -162,6 +156,7 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey routingTableEntry.Index = changes.Index() metadata := changes.Metadata() + destinationStorageName := changes.DestinationStorageName() for _, confChange := range changes.Changes() { switch confChange.changeType { @@ -173,13 +168,13 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey if slices.ContainsFunc(routingTableEntry.Replicas, func(r *gitalypb.ReplicaID) bool { return r.GetMemberId() == confChange.memberID }) { - return fmt.Errorf("member ID %d already exists", confChange.memberID) + continue } replica := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, - StorageName: storageName, + StorageName: destinationStorageName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_VOTER, } @@ -199,7 +194,7 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey learner := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, - StorageName: storageName, + StorageName: destinationStorageName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_LEARNER, } diff --git a/internal/gitaly/storage/raftmgr/routing_table_test.go b/internal/gitaly/storage/raftmgr/routing_table_test.go index e1daf265b4a..5b25f94435c 100644 --- a/internal/gitaly/storage/raftmgr/routing_table_test.go +++ b/internal/gitaly/storage/raftmgr/routing_table_test.go @@ -12,6 +12,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" ) +var destinationStorageName = "test-storage" + func TestPersistentRoutingTable(t *testing.T) { t.Parallel() @@ -34,7 +36,7 @@ func TestPersistentRoutingTable(t *testing.T) { { PartitionKey: partitionKey, MemberId: uint64(memberID), - StorageName: "test-storage", + StorageName: destinationStorageName, Metadata: &gitalypb.ReplicaID_Metadata{ Address: address, }, @@ -50,7 +52,6 @@ func TestPersistentRoutingTable(t *testing.T) { replica, err := rt.Translate(partitionKey, uint64(memberID)) require.NoError(t, err) require.Equal(t, address, replica.GetMetadata().GetAddress()) - require.Equal(t, "test-storage", replica.GetStorageName()) }) t.Run("stale entry rejected", func(t *testing.T) { @@ -112,10 +113,10 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) - changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 1, destinationStorageName, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -166,10 +167,10 @@ func TestApplyReplicaConfChange(t *testing.T) { err = rt.UpsertEntry(*initialEntry) require.NoError(t, err) - changes := NewReplicaConfChanges(2, 2, 1, 1, nil) + changes := NewReplicaConfChanges(2, 2, 1, destinationStorageName, 1, nil) changes.AddChange(2, ConfChangeRemoveNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -196,10 +197,10 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) - changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 1, destinationStorageName, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddLearnerNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -223,10 +224,10 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) - changes := NewReplicaConfChanges(1, 1, 0, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 0, destinationStorageName, 1, createMetadata("localhost:1234")) changes.AddChange(0, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID should be non-zero") }) @@ -246,21 +247,25 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) // First add a node - changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 1, destinationStorageName, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) // Try to add the same node ID again - changes = NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) + changes = NewReplicaConfChanges(2, 2, 1, destinationStorageName, 1, createMetadata("localhost:5678")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) - require.Error(t, err) - require.Contains(t, err.Error(), "member ID 1 already exists") - }) + err = rt.ApplyReplicaConfChange(partitionKey, changes) + require.NoError(t, err) + updatedEntry, err := rt.GetEntry(partitionKey) + require.NoError(t, err) + require.Len(t, updatedEntry.Replicas, 1) + require.Equal(t, uint64(1), updatedEntry.Replicas[0].GetMemberId()) + require.Equal(t, "localhost:1234", updatedEntry.Replicas[0].GetMetadata().GetAddress()) + }) t.Run("should error when updating non-existent member ID", func(t *testing.T) { t.Parallel() @@ -276,17 +281,17 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) // Add a node with ID 1 - changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 1, "test-storage", 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) // Try to update a non-existent node with ID 2 - changes = NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) + changes = NewReplicaConfChanges(2, 2, 1, "test-storage", 1, createMetadata("localhost:5678")) changes.AddChange(2, ConfChangeUpdateNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID 2 not found for update") }) @@ -321,10 +326,10 @@ func TestApplyReplicaConfChange(t *testing.T) { err = rt.UpsertEntry(*entry) require.NoError(t, err) - changes := NewReplicaConfChanges(entry.Term, entry.Index, 1, 1, nil) + changes := NewReplicaConfChanges(entry.Term, entry.Index, 1, "test-storage", 1, nil) changes.AddChange(1, ConfChangeRemoveNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "no replicas to upsert") }) @@ -360,10 +365,10 @@ func TestApplyReplicaConfChange(t *testing.T) { err = rt.UpsertEntry(*initialEntry) require.NoError(t, err) - changes := NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) + changes := NewReplicaConfChanges(2, 2, 1, "test-storage", 1, createMetadata("localhost:5678")) changes.AddChange(1, ConfChangeUpdateNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -405,11 +410,11 @@ func TestApplyReplicaConfChange(t *testing.T) { err = rt.UpsertEntry(*initialEntry) require.NoError(t, err) - changes := NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:8888")) + changes := NewReplicaConfChanges(2, 2, 1, "test-storage", 1, createMetadata("localhost:8888")) changes.AddChange(1, ConfChangeRemoveNode) changes.AddChange(2, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) -- GitLab From d3d668b5b04c4d6b7ed149dc59a104ed8a9a3f40 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 12 Aug 2025 23:10:30 +0530 Subject: [PATCH 4/8] raft: Pass partitionInfo through context in raftFactory This commit update the raftfactory to extract and use partition information from context when creating replicas. --- .../service/raft/get_partitions_test.go | 1 + internal/gitaly/storage/raftmgr/replica.go | 20 +++++-------- .../gitaly/storage/raftmgr/replica_test.go | 30 ++++++++++++------- .../gitaly/storage/raftmgr/testhelper_test.go | 5 +++- .../storage/storagemgr/partition/factory.go | 23 +++++++++----- 5 files changed, 47 insertions(+), 32 deletions(-) diff --git a/internal/gitaly/service/raft/get_partitions_test.go b/internal/gitaly/service/raft/get_partitions_test.go index 7b6e6058036..e90e140128f 100644 --- a/internal/gitaly/service/raft/get_partitions_test.go +++ b/internal/gitaly/service/raft/get_partitions_test.go @@ -371,6 +371,7 @@ func TestServer_GetClusterInfo_LiveStateVsRoutingTable(t *testing.T) { ctx, 1, // memberID partitionKey, + "@hashed/test/repo.git", cfg.Raft, logStore, raftStorage, diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index d50f1deb650..fdd32e79535 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -234,9 +234,7 @@ func applyOptions(raftCfg config.Raft, opts []OptionFunc) (ReplicaOptions, error // This factory is used to create and initialize Replica objects for partitions. type RaftReplicaFactory func( ctx context.Context, - memberID uint64, storageName string, - partitionKey *gitalypb.RaftPartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, @@ -247,24 +245,26 @@ type RaftReplicaFactory func( func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionFunc) RaftReplicaFactory { return func( ctx context.Context, - memberID uint64, storageName string, - partitionKey *gitalypb.RaftPartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, ) (*Replica, error) { - storage, err := raftNode.GetStorage(storageName) + raftStorage, err := raftNode.GetStorage(storageName) if err != nil { return nil, fmt.Errorf("get storage %q: %w", storageName, err) } - raftEnabledStorage, ok := storage.(*RaftEnabledStorage) + raftEnabledStorage, ok := raftStorage.(*RaftEnabledStorage) if !ok { return nil, fmt.Errorf("storage %q is not a RaftEnabledStorage", storageName) } + partitionInfo := storage.ExtractPartitionInfo(ctx) + partitionKey := partitionInfo.PartitionKey + memberID := partitionInfo.MemberID + relativePath := partitionInfo.RelativePath - replica, err := NewReplica(ctx, memberID, partitionKey, raftCfg, logStore, raftEnabledStorage, logger, metrics, opts...) + replica, err := NewReplica(ctx, memberID, partitionKey, relativePath, raftCfg, logStore, raftEnabledStorage, logger, metrics, opts...) if err != nil { return nil, fmt.Errorf("create replica %q: %w", storageName, err) } @@ -286,6 +286,7 @@ func NewReplica( ctx context.Context, memberID uint64, partitionKey *gitalypb.RaftPartitionKey, + relativePath string, raftCfg config.Raft, logStore *ReplicaLogStore, raftEnabledStorage *RaftEnabledStorage, @@ -297,11 +298,6 @@ func NewReplica( return nil, fmt.Errorf("raft is not enabled") } - relativePath, err := storage.ExtractPartitioningHint(ctx) - if err != nil { - return nil, fmt.Errorf("extract partitioning hint: %w", err) - } - options, err := applyOptions(raftCfg, opts) if err != nil { return nil, fmt.Errorf("invalid raft replica option: %w", err) diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index 600316e3b71..e41990957cf 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -239,8 +239,9 @@ func TestReplica_Initialize(t *testing.T) { require.NoError(t, err) raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") - mgr, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) + mgr, err := raftFactory(ctx, storageName, logStore, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, mgr.Close()) }() @@ -318,7 +319,8 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + mgr, err := raftFactory(ctx, storageName, logStore, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, mgr.Close()) }() @@ -363,7 +365,8 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr1, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore1, logger, metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + mgr1, err := raftFactory(ctx, storageName, logStore1, logger, metrics) require.NoError(t, err) // Initialize the raft replica @@ -425,7 +428,8 @@ func TestReplica_Initialize(t *testing.T) { logStore2, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) - mgr2, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore2, logger, metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + mgr2, err := raftFactory(ctx, storageName, logStore2, logger, metrics) require.NoError(t, err) // Re-initialize Raft with the highest LSN @@ -709,11 +713,10 @@ func TestReplica_AppendLogEntry(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder), WithOpTimeout(1*time.Nanosecond)) // Create replica with very short operation timeout + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") mgr, err := raftFactory( ctx, - 1, storageName, - NewPartitionKey(storageName, partitionID), logStore, logger, metrics, @@ -831,7 +834,8 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) // Configure replica - mgr, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + mgr, err := raftFactory(ctx, storageName, logStore, logger, metrics) require.NoError(t, err) for _, f := range setupFuncs { @@ -884,7 +888,8 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(env.recorder)) - recoveryMgr, err := raftFactory(ctx, 1, env.storageName, NewPartitionKey(env.storageName, env.partitionID), logStore, logger, env.metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(env.storageName, env.partitionID), 1, "") + recoveryMgr, err := raftFactory(ctx, env.storageName, logStore, logger, env.metrics) require.NoError(t, err) // Initialize with the last known LSN @@ -1665,7 +1670,8 @@ func TestReplica_StorageConnection(t *testing.T) { // Create factory that connects replicas to storage raftFactory := DefaultFactoryWithNode(cfg.Raft, raftNode) - replica, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + replica, err := raftFactory(ctx, storageName, logStore, logger, NewMetrics()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, replica.Close()) }) @@ -1696,14 +1702,16 @@ func TestReplica_StorageConnection(t *testing.T) { }) t.Run("multiple replicas for same partition key", func(t *testing.T) { - duplicateReplica, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + duplicateReplica, err := raftFactory(ctx, storageName, logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, duplicateReplica) }) t.Run("Register different replicas for different partition keys", func(t *testing.T) { partitionID := storage.PartitionID(2) - replicaTwo, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + replicaTwo, err := raftFactory(ctx, storageName, logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, replicaTwo) }) diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index 0b6dc03e05a..960bc1badea 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -103,6 +103,9 @@ func createRaftReplica(t *testing.T, ctx context.Context, memberID uint64, addre Address: address, Options: opts, } + relativePath := "git.git" + storageName := "default" + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), memberID, relativePath) return createRaftReplicaWithConfig(t, ctx, raftCfg, config, metrics) } @@ -141,7 +144,7 @@ func createRaftReplicaWithConfig(t *testing.T, ctx context.Context, raftCfg conf } raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, config.Options...) - return raftFactory(ctx, config.MemberID, storageName, NewPartitionKey(storageName, config.PartitionID), logStore, logger, metrics) + return raftFactory(ctx, storageName, logStore, logger, metrics) } func createTempServer(t *testing.T, transport *GrpcTransport) (string, *grpc.Server) { diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index 98118781be3..59ee1150927 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -65,12 +65,21 @@ func (f Factory) New( if f.raftCfg.Enabled { factory := f.raftFactory - // TODO: The PartitionKey will be retrieved from the incoming context when a Raft - // replica is booted up. This allows us to maintain a persistent key across - // all replicas of the partition. - // storageName will then represent the target storage where we intend on - // placing the new replicated partition. - partitionKey := raftmgr.NewPartitionKey(storageName, partitionID) + partitionInfo := storage.ExtractPartitionInfo(ctx) + if partitionInfo.PartitionKey == nil { + // If partitionKey is not set, it means: + // - It's a first self bootstrapped node in the cluster, + // so we need to set the partitionKey using local storage and partitionID + // and we will set the memberID to 1. + // - A node was previously part of a cluster (with a different member ID) + // and later becomes leader through normal Raft leader election + // and receives requests from Rails without partition context + // so we need a way to retrieve the partitionKey and memberID which was + // originally used by the replica before it became leader. + // https://gitlab.com/gitlab-org/gitaly/-/issues/6877 + partitionKey := raftmgr.NewPartitionKey(storageName, partitionID) + ctx = storage.ContextWithPartitionInfo(ctx, partitionKey, 1, partitionInfo.RelativePath) + } absoluteStateDir = getRaftPartitionPath(storageName, partitionID, absoluteStateDir) @@ -91,9 +100,7 @@ func (f Factory) New( } raftReplica, err := factory( ctx, - 1, storageName, - partitionKey, replicaLogStore, logger, f.partitionMetrics.raft, -- GitLab From 9422a077e3bafc8747f8b1b395939f1a8e9f4369 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 12 Aug 2025 23:22:29 +0530 Subject: [PATCH 5/8] raft: Introduce a method to create raft replica via transaction This commit updates the `GrpcTransport.Receive` to accept relativePath and storageManager which would be used by CreateReplicaViaTransaction to initiate a transaction. The transaction will assign a local partitionID. --- internal/gitaly/service/raft/join_cluster.go | 37 +++++++++++++++++++ internal/gitaly/service/raft/send_message.go | 2 +- .../gitaly/service/raft/testhelper_test.go | 4 ++ .../gitaly/storage/raftmgr/grpc_transport.go | 25 +++++-------- internal/gitaly/storage/raftmgr/replica.go | 8 ++++ .../gitaly/storage/raftmgr/testhelper_test.go | 5 +++ 6 files changed, 64 insertions(+), 17 deletions(-) diff --git a/internal/gitaly/service/raft/join_cluster.go b/internal/gitaly/service/raft/join_cluster.go index 31ec54459bf..26c2be1b94c 100644 --- a/internal/gitaly/service/raft/join_cluster.go +++ b/internal/gitaly/service/raft/join_cluster.go @@ -2,8 +2,10 @@ package raft import ( "context" + "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v18/internal/structerr" "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" @@ -90,3 +92,38 @@ func (s *Server) validateJoinClusterRequest(req *gitalypb.JoinClusterRequest) er return nil } + +func (s *Server) createReplicaViaTransaction(ctx context.Context, relativePath string, storageManager storage.Storage, replicaRegistry raftmgr.ReplicaRegistry, partitionKey *gitalypb.RaftPartitionKey) (_ raftmgr.RaftReplica, returnedErr error) { + tx, err := storageManager.Begin(ctx, storage.TransactionOptions{ + RelativePath: relativePath, + AllowPartitionAssignmentWithoutRepository: true, + }) + if err != nil { + return nil, fmt.Errorf("begin bootstrap transaction: %w", err) + } + + replica, err := replicaRegistry.GetReplica(partitionKey) + if err != nil { + return nil, fmt.Errorf("replica not found after partition creation: %w", err) + } + + started := replica.(*raftmgr.Replica).IsStarted() + if !started { + return nil, fmt.Errorf("replica is not started") + } + + defer func() { + if returnedErr != nil { + if err := tx.Rollback(ctx); err != nil { + returnedErr = errors.Join(err, fmt.Errorf("rollback: %w", err)) + } + } else { + commitLSN, err := tx.Commit(ctx) + if err != nil { + returnedErr = errors.Join(err, fmt.Errorf("fail to commit transaction: commit LSN: %d: %w", commitLSN, err)) + } + } + }() + + return replica, nil +} diff --git a/internal/gitaly/service/raft/send_message.go b/internal/gitaly/service/raft/send_message.go index af3dd00040b..da97b8fb0e9 100644 --- a/internal/gitaly/service/raft/send_message.go +++ b/internal/gitaly/service/raft/send_message.go @@ -33,7 +33,7 @@ func (s *Server) SendMessage(stream gitalypb.RaftService_SendMessageServer) erro storageManager, err := node.GetStorage(storageName) if err != nil { - return structerr.NewInternal("get storage manager: %w", err) + return structerr.NewInternal("get storage manager: storage name %s, %w", storageName, err) } raftStorage, ok := storageManager.(*raftmgr.RaftEnabledStorage) diff --git a/internal/gitaly/service/raft/testhelper_test.go b/internal/gitaly/service/raft/testhelper_test.go index c9d03a6fa02..8dcafadb5d2 100644 --- a/internal/gitaly/service/raft/testhelper_test.go +++ b/internal/gitaly/service/raft/testhelper_test.go @@ -28,6 +28,10 @@ func (m *mockRaftReplica) Step(ctx context.Context, msg raftpb.Message) error { return nil } +func (m *mockRaftReplica) IsStarted() bool { + return true +} + func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg, node *raftmgr.Node) gitalypb.RaftServiceClient { serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { deps.Cfg = cfg diff --git a/internal/gitaly/storage/raftmgr/grpc_transport.go b/internal/gitaly/storage/raftmgr/grpc_transport.go index fb05226114f..17816c095dc 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport.go @@ -131,27 +131,20 @@ func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReade t.mutex.Unlock() } - replica, err := t.routingTable.Translate(partitionKey, msg.To) + replicaID, err := t.routingTable.Translate(partitionKey, msg.To) if err != nil { return fmt.Errorf("translate memberID %d: %w", msg.To, err) } - addr := replica.GetMetadata().GetAddress() + addr := replicaID.GetMetadata().GetAddress() messagesByAddressMutex.Lock() // We are not adding address in the request because it will increase the payload size, and // is not needed on the receiver end. messagesByAddress[addr] = append(messagesByAddress[addr], &gitalypb.RaftMessageRequest{ ClusterId: t.cfg.Raft.ClusterID, - ReplicaId: &gitalypb.ReplicaID{ - PartitionKey: partitionKey, - MemberId: msg.To, - StorageName: replica.GetStorageName(), - Metadata: &gitalypb.ReplicaID_Metadata{ - Address: replica.GetMetadata().GetAddress(), - }, - }, - Message: &msg, + ReplicaId: replicaID, + Message: &msg, }) messagesByAddressMutex.Unlock() return nil @@ -213,13 +206,13 @@ func (t *GrpcTransport) packLogData(ctx context.Context, lsn storage.LSN, messag // Receive receives a stream of Raft messages and processes them. func (t *GrpcTransport) Receive(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, raftMsg raftpb.Message) error { // Retrieve the replica from the registry, assumption is that all the messages are from the same partition key. + var replica RaftReplica replica, err := t.registry.GetReplica(partitionKey) if err != nil { - // If the replica is not found, return nil until we add a way to bootstrap the replica - https://gitlab.com/gitlab-org/gitaly/-/issues/6304 - if errors.Is(err, errNoReplicaFound) { - return nil - } - return err + t.logger.WithFields(log.Fields{ + "raft.partition_key": partitionKey.GetValue(), + }).Error("replica has not been created yet") + return nil } for _, entry := range raftMsg.Entries { diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index fdd32e79535..d3ed554983c 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -64,6 +64,9 @@ type RaftReplica interface { // This operation can only be performed by the leader. AddLearner(ctx context.Context, address, destinationStorageName string) error + // IsStarted returns true if the replica has been started. + IsStarted() bool + // GetCurrentState returns comprehensive current state information including term, index, and Raft state. // This provides an efficient way to get multiple state values with consistent locking. GetCurrentState() *ReplicaState @@ -1014,6 +1017,11 @@ func (replica *Replica) signalError(err error) { replica.ready.set(err) } +// IsStarted implements RaftReplica.IsStarted +func (replica *Replica) IsStarted() bool { + return replica.started +} + // Step processes a Raft message from a remote node func (replica *Replica) Step(ctx context.Context, msg raftpb.Message) error { if !replica.started { diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index 960bc1badea..615b04fbedc 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -71,6 +71,11 @@ func (mc *mockConsumer) GetNotifications() []mockNotification { return mc.notifications } +// IsStarted is a mock implementation of RaftReplica.IsStarted +func (m *mockReplica) IsStarted() bool { + return true +} + func openTestDB(t *testing.T, ctx context.Context, cfg config.Cfg, logger logger.Logger) *databasemgr.DBManager { dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) require.NoError(t, err) -- GitLab From 745764305f3ed4cb3b70fcb20aca4a7311d7880c Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 12 Aug 2025 23:24:46 +0530 Subject: [PATCH 6/8] raft: Export MockConsumer We were earlier testing the membership methods in raftmgr package using fake servers which has limitations and the tests were not truly end to end, so it would be better to move some tests gradually to service/raft package for this we would require some to export some mocks which will by the e2e tests. --- internal/gitaly/storage/raftmgr/mock.go | 39 +++++++++++++++ .../storage/raftmgr/replica_log_store_test.go | 50 +++++++++---------- .../gitaly/storage/raftmgr/replica_test.go | 14 +++--- .../gitaly/storage/raftmgr/testhelper_test.go | 33 +----------- 4 files changed, 73 insertions(+), 63 deletions(-) create mode 100644 internal/gitaly/storage/raftmgr/mock.go diff --git a/internal/gitaly/storage/raftmgr/mock.go b/internal/gitaly/storage/raftmgr/mock.go new file mode 100644 index 00000000000..521ca49d915 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/mock.go @@ -0,0 +1,39 @@ +package raftmgr + +import ( + "sync" + + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" +) + +// MockConsumer is a mock implementation of the LogConsumer interface. +type MockConsumer struct { + notifications []mockNotification + mutex sync.Mutex +} + +type mockNotification struct { + storageName string + partitionID storage.PartitionID + lowWaterMark storage.LSN + highWaterMark storage.LSN +} + +// NotifyNewEntries is a mock implementation of the LogConsumer.NotifyNewEntries method. +func (mc *MockConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, committedLSN storage.LSN) { + mc.mutex.Lock() + defer mc.mutex.Unlock() + mc.notifications = append(mc.notifications, mockNotification{ + storageName: storageName, + partitionID: partitionID, + lowWaterMark: lowWaterMark, + highWaterMark: committedLSN, + }) +} + +// GetNotifications is a mock implementation of the LogConsumer.GetNotifications method. +func (mc *MockConsumer) GetNotifications() []mockNotification { + mc.mutex.Lock() + defer mc.mutex.Unlock() + return mc.notifications +} diff --git a/internal/gitaly/storage/raftmgr/replica_log_store_test.go b/internal/gitaly/storage/raftmgr/replica_log_store_test.go index 89266414f57..9b415264e6f 100644 --- a/internal/gitaly/storage/raftmgr/replica_log_store_test.go +++ b/internal/gitaly/storage/raftmgr/replica_log_store_test.go @@ -24,7 +24,7 @@ func setupLogStore(t *testing.T, ctx context.Context, cfg config.Cfg) *ReplicaLo logger := testhelper.NewLogger(t) db := getTestDBManager(t, ctx, cfg, logger) posTracker := log.NewPositionTracker() - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) initStatus, err := logStore.initialize(ctx, 0) @@ -69,7 +69,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { canceledCtx, cancel := context.WithCancel(ctx) cancel() // Cancel immediately to force failure - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) // Initialize with canceled context should fail with InitStatusUnknown @@ -98,7 +98,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // to verify it properly bootstraps from the saved state // Use a new position tracker to avoid "already registered" errors posTracker2 := log.NewPositionTracker() - rs2, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker2, logger, NewMetrics()) + rs2, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker2, logger, NewMetrics()) require.NoError(t, err) // It should now initialize as bootstrapped @@ -121,7 +121,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // Pre-populate n entries prepopulateEntries(t, ctx, cfg, stagingDir, stateDir, appended) - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) _, err = logStore.initialize(ctx, 0) @@ -147,7 +147,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { db := getTestDBManager(t, ctx, cfg, logger) posTracker := log.NewPositionTracker() - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -163,7 +163,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(0), lastIndex) - require.Empty(t, logStore.consumer.(*mockConsumer).GetNotifications()) + require.Empty(t, logStore.consumer.(*MockConsumer).GetNotifications()) }) t.Run("raft log store was bootstrapped, no left-over log entries after restart", func(t *testing.T) { @@ -179,7 +179,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { db, stateDir := prepopulateLogStore(t, ctx, cfg, 3, 3) // Restart the log store using the same state dir - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, testhelper.TempDir(t), stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, testhelper.TempDir(t), stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -211,7 +211,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { lowWaterMark: storage.LSN(4), highWaterMark: storage.LSN(3), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) }) t.Run("raft log store was bootstrapped, some log entries are left over", func(t *testing.T) { @@ -226,7 +226,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // Simulate a prior session db, stateDir := prepopulateLogStore(t, ctx, cfg, 5, 3) - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, testhelper.TempDir(t), stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, testhelper.TempDir(t), stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -254,7 +254,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { lowWaterMark: storage.LSN(3), highWaterMark: storage.LSN(3), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) }) } @@ -314,7 +314,7 @@ func TestReplicaLogStore_InitializeExistingPartition(t *testing.T) { require.NoError(t, localLog.Close()) // Now initialize a raft log store on the same directories - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, metrics) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -368,7 +368,7 @@ func TestReplicaLogStore_InitializeExistingPartition(t *testing.T) { metrics := NewMetrics() // PHASE 1: Initialize with Raft enabled - logStore1, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, metrics) + logStore1, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, metrics) require.NoError(t, err) initStatus, err := logStore1.initialize(ctx, 0) @@ -433,7 +433,7 @@ func TestReplicaLogStore_InitializeExistingPartition(t *testing.T) { db, stagingDir, stateDir, - &mockConsumer{}, + &MockConsumer{}, log.NewPositionTracker(), // Create a new position tracker logger, metrics, @@ -912,7 +912,7 @@ func TestReplicaLogStore_Term(t *testing.T) { logger := testhelper.NewLogger(t) db := getTestDBManager(t, ctx, cfg, logger) - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) _, err = logStore.initialize(ctx, 0) @@ -925,7 +925,7 @@ func TestReplicaLogStore_Term(t *testing.T) { require.NoError(t, logStore.close()) // Now restart the log store - logStore, err = NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore, err = NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -1242,7 +1242,7 @@ func testAppendLogEntry(t *testing.T, appendFunc func(*testing.T, context.Contex prepopulateEntries(t, ctx, cfg, stagingDir, stateDir, 10) - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) _, err = logStore.initialize(ctx, 0) require.NoError(t, err) @@ -1295,7 +1295,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { } // Has not received any notification, yet. Highest appendedLSN is 3. - require.Empty(t, logStore.consumer.(*mockConsumer).GetNotifications()) + require.Empty(t, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 1 require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 1, Vote: 1, Term: 1})) @@ -1309,7 +1309,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 2 require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 2, Vote: 1, Term: 1})) @@ -1329,7 +1329,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(2), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 3 require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 3, Vote: 1, Term: 1})) @@ -1355,7 +1355,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(3), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) }) t.Run("notify consumer since the low water mark", func(t *testing.T) { @@ -1377,7 +1377,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { } // Has not received any notification, yet. Highest appendedLSN is 3. - require.Empty(t, logStore.consumer.(*mockConsumer).GetNotifications()) + require.Empty(t, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 1 require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 1, Vote: 1, Term: 1})) @@ -1391,7 +1391,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Simulate applying up to log entry 1 require.NoError(t, logStore.localLog.AcknowledgePosition(log.AppliedPosition, storage.LSN(1))) @@ -1415,7 +1415,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(2), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 3, but don't update low water mark require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 3, Vote: 1, Term: 1})) @@ -1441,14 +1441,14 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(3), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Simulate applying up to log entry 3 require.NoError(t, logStore.localLog.AcknowledgePosition(log.AppliedPosition, storage.LSN(3))) require.Equal(t, storage.LSN(4), logStore.localLog.LowWaterMark()) // No new notifications are sent. - require.Equal(t, 3, len(logStore.consumer.(*mockConsumer).GetNotifications())) + require.Equal(t, 3, len(logStore.consumer.(*MockConsumer).GetNotifications())) }) t.Run("reject LSN beyond appendedLSN", func(t *testing.T) { diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index e41990957cf..c9263ff7c35 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -232,7 +232,7 @@ func TestReplica_Initialize(t *testing.T) { require.NoError(t, localLog.Close()) // Now create a Raft log store pointing to the same directories - logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) @@ -311,7 +311,7 @@ func TestReplica_Initialize(t *testing.T) { require.NoError(t, localLog.Close()) // Now create a Raft log store pointing to the same directories - logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) @@ -357,7 +357,7 @@ func TestReplica_Initialize(t *testing.T) { metrics := NewMetrics() // PHASE 1: Create a new partition with Raft enabled - logStore1, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore1, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) @@ -425,7 +425,7 @@ func TestReplica_Initialize(t *testing.T) { } // PHASE 3: Re-enable Raft - logStore2, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore2, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") @@ -704,7 +704,7 @@ func TestReplica_AppendLogEntry(t *testing.T) { db, dbMgr := dbSetup(t, ctx, cfg, testhelper.TempDir(t), storageName, logger) // Create a Raft log store - logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, metrics) + logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, metrics) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) @@ -825,7 +825,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { db, err := dbMgr.GetDB(cfg.Storages[0].Name) require.NoError(t, err) - logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, metrics) + logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, metrics) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) @@ -880,7 +880,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { db, err := dbMgr.GetDB(env.cfg.Storages[0].Name) require.NoError(t, err) - logStore, err := NewReplicaLogStore(env.storageName, env.partitionID, raftCfg, db, env.stagingDir, env.stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, env.metrics) + logStore, err := NewReplicaLogStore(env.storageName, env.partitionID, raftCfg, db, env.stagingDir, env.stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, env.metrics) require.NoError(t, err) raftNode, err := NewNode(env.cfg, logger, dbMgr, nil) diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index 615b04fbedc..383032c8a04 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -3,7 +3,6 @@ package raftmgr import ( "context" "net" - "sync" "testing" "github.com/stretchr/testify/require" @@ -42,35 +41,6 @@ func (m *mockReplica) Step(ctx context.Context, msg raftpb.Message) error { return nil } -type mockConsumer struct { - notifications []mockNotification - mutex sync.Mutex -} - -type mockNotification struct { - storageName string - partitionID storage.PartitionID - lowWaterMark storage.LSN - highWaterMark storage.LSN -} - -func (mc *mockConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, committedLSN storage.LSN) { - mc.mutex.Lock() - defer mc.mutex.Unlock() - mc.notifications = append(mc.notifications, mockNotification{ - storageName: storageName, - partitionID: partitionID, - lowWaterMark: lowWaterMark, - highWaterMark: committedLSN, - }) -} - -func (mc *mockConsumer) GetNotifications() []mockNotification { - mc.mutex.Lock() - defer mc.mutex.Unlock() - return mc.notifications -} - // IsStarted is a mock implementation of RaftReplica.IsStarted func (m *mockReplica) IsStarted() bool { return true @@ -110,6 +80,7 @@ func createRaftReplica(t *testing.T, ctx context.Context, memberID uint64, addre } relativePath := "git.git" storageName := "default" + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), memberID, relativePath) return createRaftReplicaWithConfig(t, ctx, raftCfg, config, metrics) @@ -132,7 +103,7 @@ func createRaftReplicaWithConfig(t *testing.T, ctx context.Context, raftCfg conf stateDir := testhelper.TempDir(t) posTracker := log.NewPositionTracker() - logStore, err := NewReplicaLogStore(storageName, config.PartitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, metrics) + logStore, err := NewReplicaLogStore(storageName, config.PartitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, metrics) if err != nil { return nil, err } -- GitLab From 5f58a45cfe83d1c25cbd0c1935fd3dc9a3ef1049 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 12 Aug 2025 23:33:57 +0530 Subject: [PATCH 7/8] raft: Support creating replicas through JoinCluster RPC The commit enables the new nodes join existing clusters via JoinCluster RPC. --- internal/gitaly/service/raft/join_cluster.go | 14 +- .../gitaly/service/raft/join_cluster_test.go | 134 ++++++++++++++---- .../gitaly/service/raft/send_message_test.go | 2 +- .../gitaly/service/raft/send_snapshot_test.go | 4 +- .../gitaly/service/raft/testhelper_test.go | 17 ++- internal/gitaly/storage/raftmgr/replica.go | 17 +++ 6 files changed, 153 insertions(+), 35 deletions(-) diff --git a/internal/gitaly/service/raft/join_cluster.go b/internal/gitaly/service/raft/join_cluster.go index 26c2be1b94c..77e259df4a5 100644 --- a/internal/gitaly/service/raft/join_cluster.go +++ b/internal/gitaly/service/raft/join_cluster.go @@ -42,14 +42,24 @@ func (s *Server) JoinCluster(ctx context.Context, req *gitalypb.JoinClusterReque RelativePath: req.GetRelativePath(), Replicas: req.GetReplicas(), LeaderID: req.GetLeaderId(), - Term: req.GetTerm(), - Index: req.GetIndex(), } if err := routingTable.UpsertEntry(*routingEntry); err != nil { return nil, structerr.NewInternal("failed to update routing table: %w", err) } + ctx = storage.ContextWithPartitionInfo(ctx, req.GetPartitionKey(), req.GetMemberId(), req.GetRelativePath()) + + replicaRegistry := raftEnabledStorage.GetReplicaRegistry() + raftReplica, err := s.createReplicaViaTransaction(ctx, req.GetRelativePath(), raftEnabledStorage, replicaRegistry, req.GetPartitionKey()) + if err != nil { + return &gitalypb.JoinClusterResponse{}, structerr.NewInternal("failed to create replica: %w", err) + } + + if !raftReplica.IsStarted() { + return &gitalypb.JoinClusterResponse{}, structerr.NewInternal("replica is not started") + } + return nil, nil } diff --git a/internal/gitaly/service/raft/join_cluster_test.go b/internal/gitaly/service/raft/join_cluster_test.go index 6bf04afbc08..fe41aede2a8 100644 --- a/internal/gitaly/service/raft/join_cluster_test.go +++ b/internal/gitaly/service/raft/join_cluster_test.go @@ -1,57 +1,109 @@ package raft import ( + "context" + "fmt" "path/filepath" "slices" "testing" + "time" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/node" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/raftmgr" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v18/internal/helper" "gitlab.com/gitlab-org/gitaly/v18/internal/log" "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" + "google.golang.org/grpc" "google.golang.org/grpc/codes" ) const ( testClusterID = "test-cluster" - testAuthorityName = "test-authority" testStorageName = "default" + testAuthorityName = "test-authority" testMemberID = uint64(3) - testRelativePath = "relative/path/to/repo" + testRelativePath = "relative/path/to/repo.git" ) -func TestJoinCluster(t *testing.T) { - t.Parallel() - +// createRaftNodeWithStorage creates a Raft enabled Gitaly node with a base storage. +func createRaftNodeWithStorage(t *testing.T, storageName string) (*raftmgr.Node, config.Cfg, error) { ctx := testhelper.Context(t) - cfg := testcfg.Build(t, testcfg.WithStorages(testStorageName)) - cfg.Raft.ClusterID = testClusterID logger := testhelper.SharedLogger(t) - dbPath := testhelper.TempDir(t) - dbMgr, err := databasemgr.NewDBManager( - ctx, - cfg.Storages, - func(logger log.Logger, path string) (keyvalue.Store, error) { - return keyvalue.NewBadgerStore(logger, filepath.Join(dbPath, path)) - }, - helper.NewNullTickerFactory(), - logger, - ) - require.NoError(t, err) + cfg := testcfg.Build(t, testcfg.WithStorages(storageName)) + cfg.Raft = raftConfigsForTest(t) + + dbMgr := setupDB(t, ctx, logger, cfg) t.Cleanup(dbMgr.Close) - mockNode, err := raftmgr.NewNode(cfg, logger, dbMgr, nil) + conns := client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor())) + t.Cleanup(func() { + err := conns.Close() + require.NoError(t, err) + }) + + nodeTwo, err := raftmgr.NewNode(cfg, logger, dbMgr, conns) + require.NoError(t, err) + + metrics := storagemgr.NewMetrics(cfg.Prometheus) + gitCmdFactory := gittest.NewCommandFactory(t, cfg) + locator := config.NewLocator(cfg) + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + partitionFactoryOptions := []partition.FactoryOption{ + partition.WithRaftConfig(cfg.Raft), + partition.WithRaftFactory(raftmgr.DefaultFactoryWithNode(cfg.Raft, nodeTwo)), + partition.WithCmdFactory(gitCmdFactory), + partition.WithRepoFactory(localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache)), + partition.WithMetrics(partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus))), + } + nodeMgr, err := node.NewManager(cfg.Storages, storagemgr.NewFactory(logger, dbMgr, partition.NewFactory(partitionFactoryOptions...), 2, metrics)) require.NoError(t, err) + t.Cleanup(nodeMgr.Close) - client := runRaftServer(t, ctx, cfg, mockNode) + // Setup the base storage for the node two to support running transactions. + for _, storageCfg := range cfg.Storages { + baseStorage, err := nodeMgr.GetStorage(storageCfg.Name) + require.NoError(t, err) + require.NoError(t, nodeTwo.SetBaseStorage(storageCfg.Name, baseStorage)) + } - partitionKey := raftmgr.NewPartitionKey(testAuthorityName, 1) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { + deps.Cfg = cfg + deps.Node = nodeTwo + gitalypb.RegisterRaftServiceServer(srv, NewServer(deps)) + }, testserver.WithDisablePraefect()) + + return nodeTwo, cfg, nil +} + +func TestJoinCluster(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + partitionKey := raftmgr.NewPartitionKey(testStorageName, 1) + + // create a second Gitaly server + node, cfg, err := createRaftNodeWithStorage(t, testStorageName) + require.NoError(t, err) + conn := gittest.DialService(t, ctx, cfg) + clientTwo := gitalypb.NewRaftServiceClient(conn) + require.NoError(t, err) testCases := []struct { desc string @@ -87,7 +139,7 @@ func TestJoinCluster(t *testing.T) { Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, - MemberId: 1, + MemberId: 2, StorageName: testStorageName, Type: gitalypb.ReplicaID_REPLICA_TYPE_VOTER, }, @@ -252,13 +304,13 @@ func TestJoinCluster(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { t.Parallel() - resp, err := client.JoinCluster(ctx, tc.req) + resp, err := clientTwo.JoinCluster(ctx, tc.req) if tc.expectedCode == codes.OK { require.NoError(t, err) require.NotNil(t, resp) - storage, err := mockNode.GetStorage(testStorageName) + storage, err := node.GetStorage(testStorageName) require.NoError(t, err) raftStorage := storage.(*raftmgr.RaftEnabledStorage) @@ -266,13 +318,11 @@ func TestJoinCluster(t *testing.T) { require.NotNil(t, routingTable) entry, err := routingTable.GetEntry(tc.req.GetPartitionKey()) + fmt.Println("entry", entry) require.NoError(t, err) require.NotNil(t, entry) require.Equal(t, tc.req.GetRelativePath(), entry.RelativePath) - require.Equal(t, tc.req.GetLeaderId(), entry.LeaderID) - require.Equal(t, uint64(1), entry.Term) - require.Equal(t, uint64(1), entry.Index) // Check that our replica is in the routing table found := slices.ContainsFunc(entry.Replicas, func(replica *gitalypb.ReplicaID) bool { @@ -295,6 +345,17 @@ func TestJoinCluster(t *testing.T) { require.Len(t, entry.Replicas, len(tc.req.GetReplicas())) + replicaRegistry := raftStorage.GetReplicaRegistry() + require.NotNil(t, replicaRegistry, "replica registry should not be nil") + + require.Eventually(t, func() bool { + replicaTwo, err := replicaRegistry.GetReplica(partitionKey) + if err != nil { + return false + } + return replicaTwo != nil + }, 5*time.Minute, 5*time.Millisecond, "replica should be created") + } else { testhelper.RequireGrpcCode(t, err, tc.expectedCode) require.Contains(t, err.Error(), tc.expectedError) @@ -327,7 +388,7 @@ func TestJoinCluster_MemberIDAlreadyExists(t *testing.T) { mockNode, err := raftmgr.NewNode(cfg, logger, dbMgr, nil) require.NoError(t, err) - client := runRaftServer(t, ctx, cfg, mockNode) + _, client := runRaftServer(t, ctx, cfg, mockNode) partitionKey := raftmgr.NewPartitionKey(testAuthorityName, 1) storage, err := mockNode.GetStorage(testStorageName) @@ -378,3 +439,20 @@ func TestJoinCluster_MemberIDAlreadyExists(t *testing.T) { testhelper.RequireGrpcCode(t, err, codes.InvalidArgument) require.Contains(t, err.Error(), "member ID 1 already exists in the cluster") } + +func setupDB(t *testing.T, ctx context.Context, logger log.Logger, cfg config.Cfg) *databasemgr.DBManager { + dbPath := testhelper.TempDir(t) + dbMgr, err := databasemgr.NewDBManager( + ctx, + cfg.Storages, + func(logger log.Logger, path string) (keyvalue.Store, error) { + return keyvalue.NewBadgerStore(logger, filepath.Join(dbPath, path)) + }, + helper.NewTimerTickerFactory(time.Minute), + logger, + ) + + require.NoError(t, err) + + return dbMgr +} diff --git a/internal/gitaly/service/raft/send_message_test.go b/internal/gitaly/service/raft/send_message_test.go index 1b759b30364..aa9b8240824 100644 --- a/internal/gitaly/service/raft/send_message_test.go +++ b/internal/gitaly/service/raft/send_message_test.go @@ -60,7 +60,7 @@ func TestServer_SendMessage(t *testing.T) { replicaTwo := &mockRaftReplica{} registryTwo.RegisterReplica(partitionKey, replicaTwo) - client := runRaftServer(t, ctx, cfg, mockNode) + _, client := runRaftServer(t, ctx, cfg, mockNode) testCases := []struct { desc string diff --git a/internal/gitaly/service/raft/send_snapshot_test.go b/internal/gitaly/service/raft/send_snapshot_test.go index 2b17b7f4c55..3c424aac414 100644 --- a/internal/gitaly/service/raft/send_snapshot_test.go +++ b/internal/gitaly/service/raft/send_snapshot_test.go @@ -61,7 +61,7 @@ func TestServer_SendSnapshot_Success(t *testing.T) { partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) - client := runRaftServer(t, ctx, cfg, mockNode) + _, client := runRaftServer(t, ctx, cfg, mockNode) // Bypasses transport and directly invokes rpc via client stream, err := client.SendSnapshot(ctx) @@ -202,7 +202,7 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) - client := runRaftServer(t, ctx, cfg, mockNode) + _, client := runRaftServer(t, ctx, cfg, mockNode) // Bypasses transport and directly invokes rpc via client stream, err := client.SendSnapshot(ctx) diff --git a/internal/gitaly/service/raft/testhelper_test.go b/internal/gitaly/service/raft/testhelper_test.go index 8dcafadb5d2..38e185641e4 100644 --- a/internal/gitaly/service/raft/testhelper_test.go +++ b/internal/gitaly/service/raft/testhelper_test.go @@ -32,7 +32,7 @@ func (m *mockRaftReplica) IsStarted() bool { return true } -func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg, node *raftmgr.Node) gitalypb.RaftServiceClient { +func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg, node *raftmgr.Node) (string, gitalypb.RaftServiceClient) { serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { deps.Cfg = cfg deps.Node = node @@ -43,5 +43,18 @@ func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg, node *raft conn := gittest.DialService(t, ctx, cfg) - return gitalypb.NewRaftServiceClient(conn) + return serverSocketPath, gitalypb.NewRaftServiceClient(conn) +} + +func raftConfigsForTest(t *testing.T) config.Raft { + // Speed up initial election overhead in the test setup + return config.Raft{ + Enabled: true, + ClusterID: "test-cluster", + ElectionTicks: 5, + HeartbeatTicks: 2, + RTTMilliseconds: 100, + ProposalConfChangeTimeout: 1500, + SnapshotDir: testhelper.TempDir(t), + } } diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index d3ed554983c..a703ab3aa19 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -1209,6 +1209,23 @@ func (replica *Replica) joinCluster(ctx context.Context, targetMemberID uint64, return nil } +// GetLeaderID returns the leader ID of the replica. +func (replica *Replica) GetLeaderID() (uint64, error) { + if !replica.started { + return 0, fmt.Errorf("raft replica not started") + } + leaderID := replica.leadership.GetLeaderID() + return leaderID, nil +} + +// GetMemberID returns the member ID of the replica. +func (replica *Replica) GetMemberID() (uint64, error) { + if !replica.started { + return 0, fmt.Errorf("raft replica not started") + } + return replica.memberID, nil +} + func checkMemberID(replica *Replica, memberID uint64, routingTable RoutingTable) error { _, err := routingTable.Translate(replica.partitionKey, memberID) if err != nil { -- GitLab From 75f1d63bf58a481249eea2460b1aa6ab8cc93894 Mon Sep 17 00:00:00 2001 From: Divya Rani Date: Tue, 12 Aug 2025 23:34:45 +0530 Subject: [PATCH 8/8] raft: Add E2E test for AddNode operation Add end-to-end tests for the AddNode operation to verify if the replicas are started and are becoming part of the cluster. Just to note while the leader will establish initial communication the destination replica will be in StateProbe mode due to some membership issues which are yet to resolved. This MR just focuses on bootstrapping and setting up initial communication. --- .../service/raft/replica_bootstrap_test.go | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 internal/gitaly/service/raft/replica_bootstrap_test.go diff --git a/internal/gitaly/service/raft/replica_bootstrap_test.go b/internal/gitaly/service/raft/replica_bootstrap_test.go new file mode 100644 index 00000000000..dc94eb43715 --- /dev/null +++ b/internal/gitaly/service/raft/replica_bootstrap_test.go @@ -0,0 +1,124 @@ +package raft + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/raftmgr" + partitionlog "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/log" + "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" + "google.golang.org/grpc" +) + +const ( + storageOne = "storage-one" + storageTwo = "storage-two" +) + +var partitionKey *gitalypb.RaftPartitionKey + +func createRaftReplica(t *testing.T, ctx context.Context, partitionID storage.PartitionID, opts ...raftmgr.OptionFunc) (*raftmgr.Replica, error) { + metrics := raftmgr.NewMetrics() + + cfg := testcfg.Build(t, testcfg.WithStorages(storageOne)) + cfg.Raft = raftConfigsForTest(t) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRaftServiceServer(srv, NewServer(deps)) + }, testserver.WithDisablePraefect()) + + logger := testhelper.NewLogger(t) + storageName := storageOne + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + posTracker := partitionlog.NewPositionTracker() + + dbMgr := setupDB(t, ctx, logger, cfg) + t.Cleanup(dbMgr.Close) + + db, err := dbMgr.GetDB(storageName) + require.NoError(t, err) + + logStore, err := raftmgr.NewReplicaLogStore(storageName, partitionID, cfg.Raft, db, stagingDir, stateDir, &raftmgr.MockConsumer{}, posTracker, logger, metrics) + require.NoError(t, err) + + conns := client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor())) + t.Cleanup(func() { + err := conns.Close() + require.NoError(t, err) + }) + + raftNode, err := raftmgr.NewNode(cfg, logger, dbMgr, conns) + require.NoError(t, err) + + raftFactory := raftmgr.DefaultFactoryWithNode(cfg.Raft, raftNode, opts...) + partitionKey = raftmgr.NewPartitionKey(storageName, partitionID) + ctx = storage.ContextWithPartitionInfo(ctx, partitionKey, 1, "gitaly.git") + manager, err := raftFactory(ctx, storageName, logStore, logger, metrics) + + return manager, err +} + +func TestRaftReplicaCreation(t *testing.T) { + t.Parallel() + ctxOne := testhelper.Context(t) + replicaOne, err := createRaftReplica(t, ctxOne, 1) + require.NoError(t, err) + require.NoError(t, replicaOne.Initialize(ctxOne, 0)) + t.Cleanup(func() { + err := replicaOne.Close() + require.NoError(t, err) + }) + // Wait for the replica to elect itself as leader + require.Eventually(t, func() bool { + return replicaOne.AppendedLSN() > 1 + }, 10*time.Second, 5*time.Millisecond, "replica should become leader") + + nodeTwo, cfg, err := createRaftNodeWithStorage(t, storageTwo) + require.NoError(t, err) + + err = replicaOne.AddNode(ctxOne, cfg.SocketPath, storageTwo) + if err != nil { + require.NoError(t, err) + } + + storageHandle, err := nodeTwo.GetStorage(storageTwo) + require.NoError(t, err) + raftEnabledStorage := storageHandle.(*raftmgr.RaftEnabledStorage) + require.NotNil(t, raftEnabledStorage, "storage should be a RaftEnabledStorage") + + replicaRegistry := raftEnabledStorage.GetReplicaRegistry() + require.NotNil(t, replicaRegistry, "replica registry should not be nil") + + var replicaTwo raftmgr.RaftReplica + require.Eventually(t, func() bool { + replicaTwo, err = replicaRegistry.GetReplica(partitionKey) + if err != nil { + return false + } + return replicaTwo != nil + }, 5*time.Minute, 5*time.Millisecond, "replica should be created") + + waitForReplicaLeader := func(t *testing.T, replicaOne, replicaTwo *raftmgr.Replica, timeout time.Duration) { + require.Eventually(t, func() bool { + leaderID, err := replicaTwo.GetLeaderID() + if err != nil { + return false + } + memberID, err := replicaOne.GetMemberID() + if err != nil { + return false + } + return leaderID == memberID + }, timeout, 5*time.Millisecond, "replica two should have the same leader as replica one") + } + + waitForReplicaLeader(t, replicaOne, replicaTwo.(*raftmgr.Replica), 5*time.Minute) +} -- GitLab