diff --git a/internal/gitaly/service/raft/get_partitions_test.go b/internal/gitaly/service/raft/get_partitions_test.go index 7b6e6058036e141608f9ec6739561595f6364b9d..e90e140128fdfc063b8e7146ce94769c52f73acc 100644 --- a/internal/gitaly/service/raft/get_partitions_test.go +++ b/internal/gitaly/service/raft/get_partitions_test.go @@ -371,6 +371,7 @@ func TestServer_GetClusterInfo_LiveStateVsRoutingTable(t *testing.T) { ctx, 1, // memberID partitionKey, + "@hashed/test/repo.git", cfg.Raft, logStore, raftStorage, diff --git a/internal/gitaly/service/raft/join_cluster.go b/internal/gitaly/service/raft/join_cluster.go index 31ec54459bf7eabb2a51c86559928360ddec5c69..77e259df4a5ed4208a4ab422574e34a1a38c7f76 100644 --- a/internal/gitaly/service/raft/join_cluster.go +++ b/internal/gitaly/service/raft/join_cluster.go @@ -2,8 +2,10 @@ package raft import ( "context" + "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v18/internal/structerr" "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" @@ -40,14 +42,24 @@ func (s *Server) JoinCluster(ctx context.Context, req *gitalypb.JoinClusterReque RelativePath: req.GetRelativePath(), Replicas: req.GetReplicas(), LeaderID: req.GetLeaderId(), - Term: req.GetTerm(), - Index: req.GetIndex(), } if err := routingTable.UpsertEntry(*routingEntry); err != nil { return nil, structerr.NewInternal("failed to update routing table: %w", err) } + ctx = storage.ContextWithPartitionInfo(ctx, req.GetPartitionKey(), req.GetMemberId(), req.GetRelativePath()) + + replicaRegistry := raftEnabledStorage.GetReplicaRegistry() + raftReplica, err := s.createReplicaViaTransaction(ctx, req.GetRelativePath(), raftEnabledStorage, replicaRegistry, req.GetPartitionKey()) + if err != nil { + return &gitalypb.JoinClusterResponse{}, structerr.NewInternal("failed to create replica: %w", err) + } + + if !raftReplica.IsStarted() { + return &gitalypb.JoinClusterResponse{}, structerr.NewInternal("replica is not started") + } + return nil, nil } @@ -90,3 +102,38 @@ func (s *Server) validateJoinClusterRequest(req *gitalypb.JoinClusterRequest) er return nil } + +func (s *Server) createReplicaViaTransaction(ctx context.Context, relativePath string, storageManager storage.Storage, replicaRegistry raftmgr.ReplicaRegistry, partitionKey *gitalypb.RaftPartitionKey) (_ raftmgr.RaftReplica, returnedErr error) { + tx, err := storageManager.Begin(ctx, storage.TransactionOptions{ + RelativePath: relativePath, + AllowPartitionAssignmentWithoutRepository: true, + }) + if err != nil { + return nil, fmt.Errorf("begin bootstrap transaction: %w", err) + } + + replica, err := replicaRegistry.GetReplica(partitionKey) + if err != nil { + return nil, fmt.Errorf("replica not found after partition creation: %w", err) + } + + started := replica.(*raftmgr.Replica).IsStarted() + if !started { + return nil, fmt.Errorf("replica is not started") + } + + defer func() { + if returnedErr != nil { + if err := tx.Rollback(ctx); err != nil { + returnedErr = errors.Join(err, fmt.Errorf("rollback: %w", err)) + } + } else { + commitLSN, err := tx.Commit(ctx) + if err != nil { + returnedErr = errors.Join(err, fmt.Errorf("fail to commit transaction: commit LSN: %d: %w", commitLSN, err)) + } + } + }() + + return replica, nil +} diff --git a/internal/gitaly/service/raft/join_cluster_test.go b/internal/gitaly/service/raft/join_cluster_test.go index 6bf04afbc08852ebc0faa88f8e9f6dea9ba5acee..fe41aede2a8267fa4d487c1ed22a234ac9678f1e 100644 --- a/internal/gitaly/service/raft/join_cluster_test.go +++ b/internal/gitaly/service/raft/join_cluster_test.go @@ -1,57 +1,109 @@ package raft import ( + "context" + "fmt" "path/filepath" "slices" "testing" + "time" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/node" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/raftmgr" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v18/internal/helper" "gitlab.com/gitlab-org/gitaly/v18/internal/log" "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" + "google.golang.org/grpc" "google.golang.org/grpc/codes" ) const ( testClusterID = "test-cluster" - testAuthorityName = "test-authority" testStorageName = "default" + testAuthorityName = "test-authority" testMemberID = uint64(3) - testRelativePath = "relative/path/to/repo" + testRelativePath = "relative/path/to/repo.git" ) -func TestJoinCluster(t *testing.T) { - t.Parallel() - +// createRaftNodeWithStorage creates a Raft enabled Gitaly node with a base storage. +func createRaftNodeWithStorage(t *testing.T, storageName string) (*raftmgr.Node, config.Cfg, error) { ctx := testhelper.Context(t) - cfg := testcfg.Build(t, testcfg.WithStorages(testStorageName)) - cfg.Raft.ClusterID = testClusterID logger := testhelper.SharedLogger(t) - dbPath := testhelper.TempDir(t) - dbMgr, err := databasemgr.NewDBManager( - ctx, - cfg.Storages, - func(logger log.Logger, path string) (keyvalue.Store, error) { - return keyvalue.NewBadgerStore(logger, filepath.Join(dbPath, path)) - }, - helper.NewNullTickerFactory(), - logger, - ) - require.NoError(t, err) + cfg := testcfg.Build(t, testcfg.WithStorages(storageName)) + cfg.Raft = raftConfigsForTest(t) + + dbMgr := setupDB(t, ctx, logger, cfg) t.Cleanup(dbMgr.Close) - mockNode, err := raftmgr.NewNode(cfg, logger, dbMgr, nil) + conns := client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor())) + t.Cleanup(func() { + err := conns.Close() + require.NoError(t, err) + }) + + nodeTwo, err := raftmgr.NewNode(cfg, logger, dbMgr, conns) + require.NoError(t, err) + + metrics := storagemgr.NewMetrics(cfg.Prometheus) + gitCmdFactory := gittest.NewCommandFactory(t, cfg) + locator := config.NewLocator(cfg) + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + partitionFactoryOptions := []partition.FactoryOption{ + partition.WithRaftConfig(cfg.Raft), + partition.WithRaftFactory(raftmgr.DefaultFactoryWithNode(cfg.Raft, nodeTwo)), + partition.WithCmdFactory(gitCmdFactory), + partition.WithRepoFactory(localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache)), + partition.WithMetrics(partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus))), + } + nodeMgr, err := node.NewManager(cfg.Storages, storagemgr.NewFactory(logger, dbMgr, partition.NewFactory(partitionFactoryOptions...), 2, metrics)) require.NoError(t, err) + t.Cleanup(nodeMgr.Close) - client := runRaftServer(t, ctx, cfg, mockNode) + // Setup the base storage for the node two to support running transactions. + for _, storageCfg := range cfg.Storages { + baseStorage, err := nodeMgr.GetStorage(storageCfg.Name) + require.NoError(t, err) + require.NoError(t, nodeTwo.SetBaseStorage(storageCfg.Name, baseStorage)) + } - partitionKey := raftmgr.NewPartitionKey(testAuthorityName, 1) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { + deps.Cfg = cfg + deps.Node = nodeTwo + gitalypb.RegisterRaftServiceServer(srv, NewServer(deps)) + }, testserver.WithDisablePraefect()) + + return nodeTwo, cfg, nil +} + +func TestJoinCluster(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + partitionKey := raftmgr.NewPartitionKey(testStorageName, 1) + + // create a second Gitaly server + node, cfg, err := createRaftNodeWithStorage(t, testStorageName) + require.NoError(t, err) + conn := gittest.DialService(t, ctx, cfg) + clientTwo := gitalypb.NewRaftServiceClient(conn) + require.NoError(t, err) testCases := []struct { desc string @@ -87,7 +139,7 @@ func TestJoinCluster(t *testing.T) { Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, - MemberId: 1, + MemberId: 2, StorageName: testStorageName, Type: gitalypb.ReplicaID_REPLICA_TYPE_VOTER, }, @@ -252,13 +304,13 @@ func TestJoinCluster(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { t.Parallel() - resp, err := client.JoinCluster(ctx, tc.req) + resp, err := clientTwo.JoinCluster(ctx, tc.req) if tc.expectedCode == codes.OK { require.NoError(t, err) require.NotNil(t, resp) - storage, err := mockNode.GetStorage(testStorageName) + storage, err := node.GetStorage(testStorageName) require.NoError(t, err) raftStorage := storage.(*raftmgr.RaftEnabledStorage) @@ -266,13 +318,11 @@ func TestJoinCluster(t *testing.T) { require.NotNil(t, routingTable) entry, err := routingTable.GetEntry(tc.req.GetPartitionKey()) + fmt.Println("entry", entry) require.NoError(t, err) require.NotNil(t, entry) require.Equal(t, tc.req.GetRelativePath(), entry.RelativePath) - require.Equal(t, tc.req.GetLeaderId(), entry.LeaderID) - require.Equal(t, uint64(1), entry.Term) - require.Equal(t, uint64(1), entry.Index) // Check that our replica is in the routing table found := slices.ContainsFunc(entry.Replicas, func(replica *gitalypb.ReplicaID) bool { @@ -295,6 +345,17 @@ func TestJoinCluster(t *testing.T) { require.Len(t, entry.Replicas, len(tc.req.GetReplicas())) + replicaRegistry := raftStorage.GetReplicaRegistry() + require.NotNil(t, replicaRegistry, "replica registry should not be nil") + + require.Eventually(t, func() bool { + replicaTwo, err := replicaRegistry.GetReplica(partitionKey) + if err != nil { + return false + } + return replicaTwo != nil + }, 5*time.Minute, 5*time.Millisecond, "replica should be created") + } else { testhelper.RequireGrpcCode(t, err, tc.expectedCode) require.Contains(t, err.Error(), tc.expectedError) @@ -327,7 +388,7 @@ func TestJoinCluster_MemberIDAlreadyExists(t *testing.T) { mockNode, err := raftmgr.NewNode(cfg, logger, dbMgr, nil) require.NoError(t, err) - client := runRaftServer(t, ctx, cfg, mockNode) + _, client := runRaftServer(t, ctx, cfg, mockNode) partitionKey := raftmgr.NewPartitionKey(testAuthorityName, 1) storage, err := mockNode.GetStorage(testStorageName) @@ -378,3 +439,20 @@ func TestJoinCluster_MemberIDAlreadyExists(t *testing.T) { testhelper.RequireGrpcCode(t, err, codes.InvalidArgument) require.Contains(t, err.Error(), "member ID 1 already exists in the cluster") } + +func setupDB(t *testing.T, ctx context.Context, logger log.Logger, cfg config.Cfg) *databasemgr.DBManager { + dbPath := testhelper.TempDir(t) + dbMgr, err := databasemgr.NewDBManager( + ctx, + cfg.Storages, + func(logger log.Logger, path string) (keyvalue.Store, error) { + return keyvalue.NewBadgerStore(logger, filepath.Join(dbPath, path)) + }, + helper.NewTimerTickerFactory(time.Minute), + logger, + ) + + require.NoError(t, err) + + return dbMgr +} diff --git a/internal/gitaly/service/raft/replica_bootstrap_test.go b/internal/gitaly/service/raft/replica_bootstrap_test.go new file mode 100644 index 0000000000000000000000000000000000000000..dc94eb437157d3d97339f5ffbe4a6790913675c6 --- /dev/null +++ b/internal/gitaly/service/raft/replica_bootstrap_test.go @@ -0,0 +1,124 @@ +package raft + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/raftmgr" + partitionlog "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/log" + "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" + "google.golang.org/grpc" +) + +const ( + storageOne = "storage-one" + storageTwo = "storage-two" +) + +var partitionKey *gitalypb.RaftPartitionKey + +func createRaftReplica(t *testing.T, ctx context.Context, partitionID storage.PartitionID, opts ...raftmgr.OptionFunc) (*raftmgr.Replica, error) { + metrics := raftmgr.NewMetrics() + + cfg := testcfg.Build(t, testcfg.WithStorages(storageOne)) + cfg.Raft = raftConfigsForTest(t) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRaftServiceServer(srv, NewServer(deps)) + }, testserver.WithDisablePraefect()) + + logger := testhelper.NewLogger(t) + storageName := storageOne + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + posTracker := partitionlog.NewPositionTracker() + + dbMgr := setupDB(t, ctx, logger, cfg) + t.Cleanup(dbMgr.Close) + + db, err := dbMgr.GetDB(storageName) + require.NoError(t, err) + + logStore, err := raftmgr.NewReplicaLogStore(storageName, partitionID, cfg.Raft, db, stagingDir, stateDir, &raftmgr.MockConsumer{}, posTracker, logger, metrics) + require.NoError(t, err) + + conns := client.NewPool(client.WithDialOptions(client.UnaryInterceptor(), client.StreamInterceptor())) + t.Cleanup(func() { + err := conns.Close() + require.NoError(t, err) + }) + + raftNode, err := raftmgr.NewNode(cfg, logger, dbMgr, conns) + require.NoError(t, err) + + raftFactory := raftmgr.DefaultFactoryWithNode(cfg.Raft, raftNode, opts...) + partitionKey = raftmgr.NewPartitionKey(storageName, partitionID) + ctx = storage.ContextWithPartitionInfo(ctx, partitionKey, 1, "gitaly.git") + manager, err := raftFactory(ctx, storageName, logStore, logger, metrics) + + return manager, err +} + +func TestRaftReplicaCreation(t *testing.T) { + t.Parallel() + ctxOne := testhelper.Context(t) + replicaOne, err := createRaftReplica(t, ctxOne, 1) + require.NoError(t, err) + require.NoError(t, replicaOne.Initialize(ctxOne, 0)) + t.Cleanup(func() { + err := replicaOne.Close() + require.NoError(t, err) + }) + // Wait for the replica to elect itself as leader + require.Eventually(t, func() bool { + return replicaOne.AppendedLSN() > 1 + }, 10*time.Second, 5*time.Millisecond, "replica should become leader") + + nodeTwo, cfg, err := createRaftNodeWithStorage(t, storageTwo) + require.NoError(t, err) + + err = replicaOne.AddNode(ctxOne, cfg.SocketPath, storageTwo) + if err != nil { + require.NoError(t, err) + } + + storageHandle, err := nodeTwo.GetStorage(storageTwo) + require.NoError(t, err) + raftEnabledStorage := storageHandle.(*raftmgr.RaftEnabledStorage) + require.NotNil(t, raftEnabledStorage, "storage should be a RaftEnabledStorage") + + replicaRegistry := raftEnabledStorage.GetReplicaRegistry() + require.NotNil(t, replicaRegistry, "replica registry should not be nil") + + var replicaTwo raftmgr.RaftReplica + require.Eventually(t, func() bool { + replicaTwo, err = replicaRegistry.GetReplica(partitionKey) + if err != nil { + return false + } + return replicaTwo != nil + }, 5*time.Minute, 5*time.Millisecond, "replica should be created") + + waitForReplicaLeader := func(t *testing.T, replicaOne, replicaTwo *raftmgr.Replica, timeout time.Duration) { + require.Eventually(t, func() bool { + leaderID, err := replicaTwo.GetLeaderID() + if err != nil { + return false + } + memberID, err := replicaOne.GetMemberID() + if err != nil { + return false + } + return leaderID == memberID + }, timeout, 5*time.Millisecond, "replica two should have the same leader as replica one") + } + + waitForReplicaLeader(t, replicaOne, replicaTwo.(*raftmgr.Replica), 5*time.Minute) +} diff --git a/internal/gitaly/service/raft/send_message.go b/internal/gitaly/service/raft/send_message.go index af3dd00040b6186e5096054e3f724499548062c2..da97b8fb0e9f3eca40eec7ca3c128085694afeaa 100644 --- a/internal/gitaly/service/raft/send_message.go +++ b/internal/gitaly/service/raft/send_message.go @@ -33,7 +33,7 @@ func (s *Server) SendMessage(stream gitalypb.RaftService_SendMessageServer) erro storageManager, err := node.GetStorage(storageName) if err != nil { - return structerr.NewInternal("get storage manager: %w", err) + return structerr.NewInternal("get storage manager: storage name %s, %w", storageName, err) } raftStorage, ok := storageManager.(*raftmgr.RaftEnabledStorage) diff --git a/internal/gitaly/service/raft/send_message_test.go b/internal/gitaly/service/raft/send_message_test.go index 1b759b303647d642c65862ef9a6417857b52228f..aa9b824082499ab593d6af8a092f8e7be3c954fd 100644 --- a/internal/gitaly/service/raft/send_message_test.go +++ b/internal/gitaly/service/raft/send_message_test.go @@ -60,7 +60,7 @@ func TestServer_SendMessage(t *testing.T) { replicaTwo := &mockRaftReplica{} registryTwo.RegisterReplica(partitionKey, replicaTwo) - client := runRaftServer(t, ctx, cfg, mockNode) + _, client := runRaftServer(t, ctx, cfg, mockNode) testCases := []struct { desc string diff --git a/internal/gitaly/service/raft/send_snapshot_test.go b/internal/gitaly/service/raft/send_snapshot_test.go index 2b17b7f4c55bccb278215a7c96c15b0331c48a05..3c424aac414e885a6ce4450395fad5b4da78299e 100644 --- a/internal/gitaly/service/raft/send_snapshot_test.go +++ b/internal/gitaly/service/raft/send_snapshot_test.go @@ -61,7 +61,7 @@ func TestServer_SendSnapshot_Success(t *testing.T) { partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) - client := runRaftServer(t, ctx, cfg, mockNode) + _, client := runRaftServer(t, ctx, cfg, mockNode) // Bypasses transport and directly invokes rpc via client stream, err := client.SendSnapshot(ctx) @@ -202,7 +202,7 @@ func TestServer_SendSnapshot_Errors(t *testing.T) { partitionKey := raftmgr.NewPartitionKey(authorityName, 1) registry.RegisterReplica(partitionKey, replica) - client := runRaftServer(t, ctx, cfg, mockNode) + _, client := runRaftServer(t, ctx, cfg, mockNode) // Bypasses transport and directly invokes rpc via client stream, err := client.SendSnapshot(ctx) diff --git a/internal/gitaly/service/raft/testhelper_test.go b/internal/gitaly/service/raft/testhelper_test.go index c9d03a6fa02b186ad4377982d640e333cec4ba88..38e185641e4abb05385c3de37fe5e26e93f888cf 100644 --- a/internal/gitaly/service/raft/testhelper_test.go +++ b/internal/gitaly/service/raft/testhelper_test.go @@ -28,7 +28,11 @@ func (m *mockRaftReplica) Step(ctx context.Context, msg raftpb.Message) error { return nil } -func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg, node *raftmgr.Node) gitalypb.RaftServiceClient { +func (m *mockRaftReplica) IsStarted() bool { + return true +} + +func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg, node *raftmgr.Node) (string, gitalypb.RaftServiceClient) { serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { deps.Cfg = cfg deps.Node = node @@ -39,5 +43,18 @@ func runRaftServer(t *testing.T, ctx context.Context, cfg config.Cfg, node *raft conn := gittest.DialService(t, ctx, cfg) - return gitalypb.NewRaftServiceClient(conn) + return serverSocketPath, gitalypb.NewRaftServiceClient(conn) +} + +func raftConfigsForTest(t *testing.T) config.Raft { + // Speed up initial election overhead in the test setup + return config.Raft{ + Enabled: true, + ClusterID: "test-cluster", + ElectionTicks: 5, + HeartbeatTicks: 2, + RTTMilliseconds: 100, + ProposalConfChangeTimeout: 1500, + SnapshotDir: testhelper.TempDir(t), + } } diff --git a/internal/gitaly/storage/context.go b/internal/gitaly/storage/context.go index dc076310c9d0a2d6ddcba1f59f48d2168512b41d..91786820e7532f41d8f3376c3db42111501aad6b 100644 --- a/internal/gitaly/storage/context.go +++ b/internal/gitaly/storage/context.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" "google.golang.org/grpc/metadata" ) @@ -47,6 +48,35 @@ func ExtractTransaction(ctx context.Context) Transaction { return value.(Transaction) } +type keyPartitionInfo struct{} + +type partitionInfo struct { + PartitionKey *gitalypb.RaftPartitionKey + MemberID uint64 + RelativePath string +} + +// ContextWithPartitionInfo stores the partition info into the context. +// This is used to pass the original partition info to the partition factory. +func ContextWithPartitionInfo(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, memberID uint64, relativePath string) context.Context { + return context.WithValue(ctx, keyPartitionInfo{}, partitionInfo{ + PartitionKey: partitionKey, + MemberID: memberID, + RelativePath: relativePath, + }) +} + +// ExtractPartitionInfo extracts the partition info from the context. Nil is returned if there's +// no partition info in the context. +func ExtractPartitionInfo(ctx context.Context) partitionInfo { + value := ctx.Value(keyPartitionInfo{}) + if value == nil { + return partitionInfo{} + } + + return value.(partitionInfo) +} + const keyPartitioningHint = "gitaly-partitioning-hint" // ContextWithPartitioningHint stores the relativePath as a partitioning hint into the incoming diff --git a/internal/gitaly/storage/raftmgr/grpc_transport.go b/internal/gitaly/storage/raftmgr/grpc_transport.go index fb05226114fa9f80ee37ebe8b14194a1bf005daf..17816c095dce1f6fea5a393cbd6b3095f383c9e9 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport.go @@ -131,27 +131,20 @@ func (t *GrpcTransport) prepareRaftMessageRequests(ctx context.Context, logReade t.mutex.Unlock() } - replica, err := t.routingTable.Translate(partitionKey, msg.To) + replicaID, err := t.routingTable.Translate(partitionKey, msg.To) if err != nil { return fmt.Errorf("translate memberID %d: %w", msg.To, err) } - addr := replica.GetMetadata().GetAddress() + addr := replicaID.GetMetadata().GetAddress() messagesByAddressMutex.Lock() // We are not adding address in the request because it will increase the payload size, and // is not needed on the receiver end. messagesByAddress[addr] = append(messagesByAddress[addr], &gitalypb.RaftMessageRequest{ ClusterId: t.cfg.Raft.ClusterID, - ReplicaId: &gitalypb.ReplicaID{ - PartitionKey: partitionKey, - MemberId: msg.To, - StorageName: replica.GetStorageName(), - Metadata: &gitalypb.ReplicaID_Metadata{ - Address: replica.GetMetadata().GetAddress(), - }, - }, - Message: &msg, + ReplicaId: replicaID, + Message: &msg, }) messagesByAddressMutex.Unlock() return nil @@ -213,13 +206,13 @@ func (t *GrpcTransport) packLogData(ctx context.Context, lsn storage.LSN, messag // Receive receives a stream of Raft messages and processes them. func (t *GrpcTransport) Receive(ctx context.Context, partitionKey *gitalypb.RaftPartitionKey, raftMsg raftpb.Message) error { // Retrieve the replica from the registry, assumption is that all the messages are from the same partition key. + var replica RaftReplica replica, err := t.registry.GetReplica(partitionKey) if err != nil { - // If the replica is not found, return nil until we add a way to bootstrap the replica - https://gitlab.com/gitlab-org/gitaly/-/issues/6304 - if errors.Is(err, errNoReplicaFound) { - return nil - } - return err + t.logger.WithFields(log.Fields{ + "raft.partition_key": partitionKey.GetValue(), + }).Error("replica has not been created yet") + return nil } for _, entry := range raftMsg.Entries { diff --git a/internal/gitaly/storage/raftmgr/mock.go b/internal/gitaly/storage/raftmgr/mock.go new file mode 100644 index 0000000000000000000000000000000000000000..521ca49d915a9ec664920662c63bc78c85b33074 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/mock.go @@ -0,0 +1,39 @@ +package raftmgr + +import ( + "sync" + + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" +) + +// MockConsumer is a mock implementation of the LogConsumer interface. +type MockConsumer struct { + notifications []mockNotification + mutex sync.Mutex +} + +type mockNotification struct { + storageName string + partitionID storage.PartitionID + lowWaterMark storage.LSN + highWaterMark storage.LSN +} + +// NotifyNewEntries is a mock implementation of the LogConsumer.NotifyNewEntries method. +func (mc *MockConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, committedLSN storage.LSN) { + mc.mutex.Lock() + defer mc.mutex.Unlock() + mc.notifications = append(mc.notifications, mockNotification{ + storageName: storageName, + partitionID: partitionID, + lowWaterMark: lowWaterMark, + highWaterMark: committedLSN, + }) +} + +// GetNotifications is a mock implementation of the LogConsumer.GetNotifications method. +func (mc *MockConsumer) GetNotifications() []mockNotification { + mc.mutex.Lock() + defer mc.mutex.Unlock() + return mc.notifications +} diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index 68ae6faaed4b4aad62875b6d3338637c60a1cdfe..a703ab3aa19548068595d8ee11068079bd500992 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -56,13 +56,16 @@ type RaftReplica interface { // RemoveNode removes a node from the Raft cluster. // This operation can only be performed by the leader. - RemoveNode(ctx context.Context, memberID uint64) error + RemoveNode(ctx context.Context, memberID uint64, destinationStorageName string) error // AddLearner adds a new non-voting learner node to the Raft cluster. // Learner nodes receive log entries but don't participate in elections. // This is typically used to bring new nodes up to speed before promoting them to voters. // This operation can only be performed by the leader. - AddLearner(ctx context.Context, address string) error + AddLearner(ctx context.Context, address, destinationStorageName string) error + + // IsStarted returns true if the replica has been started. + IsStarted() bool // GetCurrentState returns comprehensive current state information including term, index, and Raft state. // This provides an efficient way to get multiple state values with consistent locking. @@ -234,9 +237,7 @@ func applyOptions(raftCfg config.Raft, opts []OptionFunc) (ReplicaOptions, error // This factory is used to create and initialize Replica objects for partitions. type RaftReplicaFactory func( ctx context.Context, - memberID uint64, storageName string, - partitionKey *gitalypb.RaftPartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, @@ -247,24 +248,26 @@ type RaftReplicaFactory func( func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionFunc) RaftReplicaFactory { return func( ctx context.Context, - memberID uint64, storageName string, - partitionKey *gitalypb.RaftPartitionKey, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, ) (*Replica, error) { - storage, err := raftNode.GetStorage(storageName) + raftStorage, err := raftNode.GetStorage(storageName) if err != nil { return nil, fmt.Errorf("get storage %q: %w", storageName, err) } - raftEnabledStorage, ok := storage.(*RaftEnabledStorage) + raftEnabledStorage, ok := raftStorage.(*RaftEnabledStorage) if !ok { return nil, fmt.Errorf("storage %q is not a RaftEnabledStorage", storageName) } + partitionInfo := storage.ExtractPartitionInfo(ctx) + partitionKey := partitionInfo.PartitionKey + memberID := partitionInfo.MemberID + relativePath := partitionInfo.RelativePath - replica, err := NewReplica(ctx, memberID, partitionKey, raftCfg, logStore, raftEnabledStorage, logger, metrics, opts...) + replica, err := NewReplica(ctx, memberID, partitionKey, relativePath, raftCfg, logStore, raftEnabledStorage, logger, metrics, opts...) if err != nil { return nil, fmt.Errorf("create replica %q: %w", storageName, err) } @@ -286,6 +289,7 @@ func NewReplica( ctx context.Context, memberID uint64, partitionKey *gitalypb.RaftPartitionKey, + relativePath string, raftCfg config.Raft, logStore *ReplicaLogStore, raftEnabledStorage *RaftEnabledStorage, @@ -297,11 +301,6 @@ func NewReplica( return nil, fmt.Errorf("raft is not enabled") } - relativePath, err := storage.ExtractPartitioningHint(ctx) - if err != nil { - return nil, fmt.Errorf("extract partitioning hint: %w", err) - } - options, err := applyOptions(raftCfg, opts) if err != nil { return nil, fmt.Errorf("invalid raft replica option: %w", err) @@ -929,7 +928,7 @@ func (replica *Replica) processConfChange(entry raftpb.Entry) error { routingTable := replica.raftEnabledStorage.GetRoutingTable() // Apply the changes to the routing table - if err := routingTable.ApplyReplicaConfChange(replica.logStore.storageName, replica.partitionKey, replicaChanges); err != nil { + if err := routingTable.ApplyReplicaConfChange(replica.partitionKey, replicaChanges); err != nil { return fmt.Errorf("applying conf changes: %w", err) } @@ -1018,6 +1017,11 @@ func (replica *Replica) signalError(err error) { replica.ready.set(err) } +// IsStarted implements RaftReplica.IsStarted +func (replica *Replica) IsStarted() bool { + return replica.started +} + // Step processes a Raft message from a remote node func (replica *Replica) Step(ctx context.Context, msg raftpb.Message) error { if !replica.started { @@ -1031,19 +1035,20 @@ func (replica *Replica) Step(ctx context.Context, msg raftpb.Message) error { func (replica *Replica) AddNode(ctx context.Context, address, destinationStorageName string) error { memberID := uint64(replica.AppendedLSN() + 1) return replica.proposeMembershipChange(ctx, string(addVoter), destinationStorageName, memberID, ConfChangeAddNode, &gitalypb.ReplicaID_Metadata{ - Address: address, + Address: address, + RelativePath: replica.relativePath, }) } // RemoveNode implements RaftReplica.RemoveNode -func (replica *Replica) RemoveNode(ctx context.Context, memberID uint64) error { - return replica.proposeMembershipChange(ctx, string(removeNode), "", memberID, ConfChangeRemoveNode, nil) +func (replica *Replica) RemoveNode(ctx context.Context, memberID uint64, destinationStorageName string) error { + return replica.proposeMembershipChange(ctx, string(removeNode), destinationStorageName, memberID, ConfChangeRemoveNode, nil) } // AddLearner implements RaftReplica.AddLearner -func (replica *Replica) AddLearner(ctx context.Context, address string) error { +func (replica *Replica) AddLearner(ctx context.Context, address, destinationStorageName string) error { memberID := uint64(replica.AppendedLSN() + 1) - return replica.proposeMembershipChange(ctx, string(addLearner), "", memberID, ConfChangeAddLearnerNode, &gitalypb.ReplicaID_Metadata{ + return replica.proposeMembershipChange(ctx, string(addLearner), destinationStorageName, memberID, ConfChangeAddLearnerNode, &gitalypb.ReplicaID_Metadata{ Address: address, }) } @@ -1086,12 +1091,12 @@ func (replica *Replica) proposeRemoveNode(ctx context.Context, changeType string return err } - return replica.proposeConfChange(ctx, changeType, memberID, ConfChangeRemoveNode, nil) + return replica.proposeConfChange(ctx, changeType, memberID, "", ConfChangeRemoveNode, nil) } func (replica *Replica) proposeAddNode(ctx context.Context, changeType, destinationStorageName string, memberID uint64, metadata *gitalypb.ReplicaID_Metadata) (returnedErr error) { // First, propose the configuration change - if err := replica.proposeConfChange(ctx, changeType, memberID, ConfChangeAddNode, metadata); err != nil { + if err := replica.proposeConfChange(ctx, changeType, memberID, destinationStorageName, ConfChangeAddNode, metadata); err != nil { return err } @@ -1100,7 +1105,7 @@ func (replica *Replica) proposeAddNode(ctx context.Context, changeType, destinat // If join fails, attempt to remove the node from the cluster replica.logger.WithError(err).Warn("join cluster failed, attempting to remove node") - if removeErr := replica.proposeConfChange(ctx, string(removeNode), memberID, ConfChangeRemoveNode, nil); removeErr != nil { + if removeErr := replica.proposeConfChange(ctx, string(removeNode), memberID, "", ConfChangeRemoveNode, nil); removeErr != nil { replica.logger.WithError(removeErr).Error("failed to remove node after join failure") } @@ -1111,13 +1116,14 @@ func (replica *Replica) proposeAddNode(ctx context.Context, changeType, destinat } func (replica *Replica) proposeAddLearner(ctx context.Context, changeType string, memberID uint64, metadata *gitalypb.ReplicaID_Metadata) error { - return replica.proposeConfChange(ctx, changeType, memberID, ConfChangeAddLearnerNode, metadata) + return replica.proposeConfChange(ctx, changeType, memberID, "", ConfChangeAddLearnerNode, metadata) } func (replica *Replica) proposeConfChange( ctx context.Context, changeType string, memberID uint64, + destinationStorageName string, confChangeType ConfChangeType, metadata *gitalypb.ReplicaID_Metadata, ) (returnedErr error) { @@ -1132,6 +1138,7 @@ func (replica *Replica) proposeConfChange( replica.node.Status().Term, uint64(replica.AppendedLSN()), replica.leadership.GetLeaderID(), + destinationStorageName, waiter.ID, metadata, ) @@ -1202,6 +1209,23 @@ func (replica *Replica) joinCluster(ctx context.Context, targetMemberID uint64, return nil } +// GetLeaderID returns the leader ID of the replica. +func (replica *Replica) GetLeaderID() (uint64, error) { + if !replica.started { + return 0, fmt.Errorf("raft replica not started") + } + leaderID := replica.leadership.GetLeaderID() + return leaderID, nil +} + +// GetMemberID returns the member ID of the replica. +func (replica *Replica) GetMemberID() (uint64, error) { + if !replica.started { + return 0, fmt.Errorf("raft replica not started") + } + return replica.memberID, nil +} + func checkMemberID(replica *Replica, memberID uint64, routingTable RoutingTable) error { _, err := routingTable.Translate(replica.partitionKey, memberID) if err != nil { diff --git a/internal/gitaly/storage/raftmgr/replica_conf_change.go b/internal/gitaly/storage/raftmgr/replica_conf_change.go index 354ecf19e21c4fc4096aa950003a7d684656bd0f..07ac66cd251b65dd49286a4b70f1628cc1fed9c2 100644 --- a/internal/gitaly/storage/raftmgr/replica_conf_change.go +++ b/internal/gitaly/storage/raftmgr/replica_conf_change.go @@ -29,18 +29,20 @@ type ReplicaConfChange struct { // a consistent interface for configuration changes regardless of the underlying // implementation (ConfChange or ConfChangeV2). type ReplicaConfChanges struct { - changes []ReplicaConfChange - metadata *gitalypb.ReplicaID_Metadata - term uint64 - index uint64 - leaderID uint64 - eventID EventID + changes []ReplicaConfChange + metadata *gitalypb.ReplicaID_Metadata + term uint64 + index uint64 + leaderID uint64 + destinationStorageName string + eventID EventID } // ConfChangeContext wraps both event ID and metadata for config changes type ConfChangeContext struct { - EventID EventID `json:"event_id,omitempty"` - Metadata *gitalypb.ReplicaID_Metadata `json:"metadata,omitempty"` + EventID EventID `json:"event_id,omitempty"` + Metadata *gitalypb.ReplicaID_Metadata `json:"metadata,omitempty"` + DestinationStorageName string `json:"destination_storage_name,omitempty"` } // NewReplicaConfChanges creates a new ReplicaConfChanges instance. @@ -48,16 +50,18 @@ func NewReplicaConfChanges( term uint64, index uint64, leaderID uint64, + destinationStorageName string, eventID EventID, metadata *gitalypb.ReplicaID_Metadata, ) *ReplicaConfChanges { return &ReplicaConfChanges{ - changes: make([]ReplicaConfChange, 0), - metadata: metadata, - term: term, - index: index, - leaderID: leaderID, - eventID: eventID, + changes: make([]ReplicaConfChange, 0), + metadata: metadata, + destinationStorageName: destinationStorageName, + term: term, + index: index, + leaderID: leaderID, + eventID: eventID, } } @@ -79,6 +83,11 @@ func (r *ReplicaConfChanges) Metadata() *gitalypb.ReplicaID_Metadata { return r.metadata } +// DestinationStorageName returns the destination storage name associated with the configuration changes. +func (r *ReplicaConfChanges) DestinationStorageName() string { + return r.destinationStorageName +} + // Term returns the term of the configuration changes. func (r *ReplicaConfChanges) Term() uint64 { return r.term @@ -144,8 +153,9 @@ func (r *ReplicaConfChanges) ToConfChangeV2() (raftpb.ConfChangeV2, error) { func (r *ReplicaConfChanges) encodeContext() ([]byte, error) { context := &ConfChangeContext{ - EventID: r.eventID, - Metadata: r.metadata, + EventID: r.eventID, + Metadata: r.metadata, + DestinationStorageName: r.destinationStorageName, } return json.Marshal(context) @@ -167,17 +177,17 @@ func parseChangeType(ccType raftpb.ConfChangeType) (ConfChangeType, error) { } } -func parseContext(context []byte) (EventID, *gitalypb.ReplicaID_Metadata, error) { +func parseContext(context []byte) (EventID, string, *gitalypb.ReplicaID_Metadata, error) { if len(context) == 0 { - return 0, nil, nil + return 0, "", nil, nil } var confChangeContext ConfChangeContext if err := json.Unmarshal(context, &confChangeContext); err != nil { - return 0, nil, fmt.Errorf("unmarshal context: %w", err) + return 0, "", nil, fmt.Errorf("unmarshal context: %w", err) } - return confChangeContext.EventID, confChangeContext.Metadata, nil + return confChangeContext.EventID, confChangeContext.DestinationStorageName, confChangeContext.Metadata, nil } // ParseConfChange parses a raftpb.Entry containing a configuration change directly into a ReplicaConfChanges. @@ -190,7 +200,7 @@ func ParseConfChange(entry raftpb.Entry, leaderID uint64) (*ReplicaConfChanges, return nil, fmt.Errorf("unmarshalling EntryConfChange: %w", err) } - eventID, metadata, err := parseContext(cc.Context) + eventID, destinationStorageName, metadata, err := parseContext(cc.Context) if err != nil { return nil, err } @@ -200,7 +210,7 @@ func ParseConfChange(entry raftpb.Entry, leaderID uint64) (*ReplicaConfChanges, return nil, err } - result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, eventID, metadata) + result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, destinationStorageName, eventID, metadata) result.AddChange(cc.NodeID, nodeType) return result, nil } else if entry.Type == raftpb.EntryConfChangeV2 { @@ -213,12 +223,12 @@ func ParseConfChange(entry raftpb.Entry, leaderID uint64) (*ReplicaConfChanges, return nil, fmt.Errorf("no changes in ConfChangeV2") } - eventID, metadata, err := parseContext(cc.Context) + eventID, destinationStorageName, metadata, err := parseContext(cc.Context) if err != nil { return nil, err } - result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, eventID, metadata) + result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, destinationStorageName, eventID, metadata) for _, change := range cc.Changes { nodeType, err := parseChangeType(change.Type) diff --git a/internal/gitaly/storage/raftmgr/replica_conf_change_test.go b/internal/gitaly/storage/raftmgr/replica_conf_change_test.go index eb85b085a6abeb0e9098ab4dca8091d5d413b495..f2a65eecb7c3c08d16b9f04da1acf54d523aa487 100644 --- a/internal/gitaly/storage/raftmgr/replica_conf_change_test.go +++ b/internal/gitaly/storage/raftmgr/replica_conf_change_test.go @@ -7,6 +7,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" ) +var destinationStorage = "test-storage" + func TestConfChangeContext_EncodeDecode(t *testing.T) { t.Parallel() @@ -15,7 +17,7 @@ func TestConfChangeContext_EncodeDecode(t *testing.T) { } t.Run("encode and decode with event ID and metadata", func(t *testing.T) { - changes := NewReplicaConfChanges(1, 2, 3, 12345, metadata) + changes := NewReplicaConfChanges(1, 2, 3, destinationStorage, 12345, metadata) // Encode contextBytes, err := changes.encodeContext() @@ -23,15 +25,16 @@ func TestConfChangeContext_EncodeDecode(t *testing.T) { require.NotEmpty(t, contextBytes) // Decode - eventID, decodedMetadata, err := parseContext(contextBytes) + eventID, actualDestinationStorageName, decodedMetadata, err := parseContext(contextBytes) require.NoError(t, err) require.Equal(t, EventID(12345), eventID) + require.Equal(t, destinationStorage, actualDestinationStorageName) require.NotNil(t, decodedMetadata) require.Equal(t, "unix:///tmp/test.socket", decodedMetadata.GetAddress()) }) t.Run("encode and decode with only event ID", func(t *testing.T) { - changes := NewReplicaConfChanges(1, 2, 3, 67890, nil) + changes := NewReplicaConfChanges(1, 2, 3, destinationStorage, 67890, nil) // Encode contextBytes, err := changes.encodeContext() @@ -39,35 +42,39 @@ func TestConfChangeContext_EncodeDecode(t *testing.T) { require.NotEmpty(t, contextBytes) // Decode - eventID, decodedMetadata, err := parseContext(contextBytes) + eventID, destinationStorageName, decodedMetadata, err := parseContext(contextBytes) require.NoError(t, err) require.Equal(t, EventID(67890), eventID) + require.Equal(t, destinationStorage, destinationStorageName) require.Nil(t, decodedMetadata) }) t.Run("encode and decode with only metadata", func(t *testing.T) { - changes := NewReplicaConfChanges(1, 2, 3, 0, metadata) + changes := NewReplicaConfChanges(1, 2, 3, destinationStorage, 0, metadata) contextBytes, err := changes.encodeContext() require.NoError(t, err) require.NotEmpty(t, contextBytes) - eventID, decodedMetadata, err := parseContext(contextBytes) + eventID, destinationStorageName, decodedMetadata, err := parseContext(contextBytes) require.NoError(t, err) require.Equal(t, EventID(0), eventID) + require.Equal(t, destinationStorage, destinationStorageName) require.NotNil(t, decodedMetadata) require.Equal(t, "unix:///tmp/test.socket", decodedMetadata.GetAddress()) }) t.Run("empty context", func(t *testing.T) { - eventID, metadata, err := parseContext(nil) + eventID, destinationStorageName, metadata, err := parseContext(nil) require.NoError(t, err) require.Equal(t, EventID(0), eventID) + require.Equal(t, "", destinationStorageName) require.Nil(t, metadata) - eventID, metadata, err = parseContext([]byte{}) + eventID, destinationStorageName, metadata, err = parseContext([]byte{}) require.NoError(t, err) require.Equal(t, EventID(0), eventID) + require.Equal(t, "", destinationStorageName) require.Nil(t, metadata) }) } diff --git a/internal/gitaly/storage/raftmgr/replica_log_store_test.go b/internal/gitaly/storage/raftmgr/replica_log_store_test.go index 89266414f576a3f6a93a2509e378366d02599e1b..9b415264e6fba542e148f1b177da68010bc140bc 100644 --- a/internal/gitaly/storage/raftmgr/replica_log_store_test.go +++ b/internal/gitaly/storage/raftmgr/replica_log_store_test.go @@ -24,7 +24,7 @@ func setupLogStore(t *testing.T, ctx context.Context, cfg config.Cfg) *ReplicaLo logger := testhelper.NewLogger(t) db := getTestDBManager(t, ctx, cfg, logger) posTracker := log.NewPositionTracker() - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) initStatus, err := logStore.initialize(ctx, 0) @@ -69,7 +69,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { canceledCtx, cancel := context.WithCancel(ctx) cancel() // Cancel immediately to force failure - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) // Initialize with canceled context should fail with InitStatusUnknown @@ -98,7 +98,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // to verify it properly bootstraps from the saved state // Use a new position tracker to avoid "already registered" errors posTracker2 := log.NewPositionTracker() - rs2, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker2, logger, NewMetrics()) + rs2, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker2, logger, NewMetrics()) require.NoError(t, err) // It should now initialize as bootstrapped @@ -121,7 +121,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // Pre-populate n entries prepopulateEntries(t, ctx, cfg, stagingDir, stateDir, appended) - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) _, err = logStore.initialize(ctx, 0) @@ -147,7 +147,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { db := getTestDBManager(t, ctx, cfg, logger) posTracker := log.NewPositionTracker() - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -163,7 +163,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(0), lastIndex) - require.Empty(t, logStore.consumer.(*mockConsumer).GetNotifications()) + require.Empty(t, logStore.consumer.(*MockConsumer).GetNotifications()) }) t.Run("raft log store was bootstrapped, no left-over log entries after restart", func(t *testing.T) { @@ -179,7 +179,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { db, stateDir := prepopulateLogStore(t, ctx, cfg, 3, 3) // Restart the log store using the same state dir - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, testhelper.TempDir(t), stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, testhelper.TempDir(t), stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -211,7 +211,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { lowWaterMark: storage.LSN(4), highWaterMark: storage.LSN(3), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) }) t.Run("raft log store was bootstrapped, some log entries are left over", func(t *testing.T) { @@ -226,7 +226,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { // Simulate a prior session db, stateDir := prepopulateLogStore(t, ctx, cfg, 5, 3) - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, testhelper.TempDir(t), stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, testhelper.TempDir(t), stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -254,7 +254,7 @@ func TestReplicaLogStore_Initialize(t *testing.T) { lowWaterMark: storage.LSN(3), highWaterMark: storage.LSN(3), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) }) } @@ -314,7 +314,7 @@ func TestReplicaLogStore_InitializeExistingPartition(t *testing.T) { require.NoError(t, localLog.Close()) // Now initialize a raft log store on the same directories - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, metrics) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -368,7 +368,7 @@ func TestReplicaLogStore_InitializeExistingPartition(t *testing.T) { metrics := NewMetrics() // PHASE 1: Initialize with Raft enabled - logStore1, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, metrics) + logStore1, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, metrics) require.NoError(t, err) initStatus, err := logStore1.initialize(ctx, 0) @@ -433,7 +433,7 @@ func TestReplicaLogStore_InitializeExistingPartition(t *testing.T) { db, stagingDir, stateDir, - &mockConsumer{}, + &MockConsumer{}, log.NewPositionTracker(), // Create a new position tracker logger, metrics, @@ -912,7 +912,7 @@ func TestReplicaLogStore_Term(t *testing.T) { logger := testhelper.NewLogger(t) db := getTestDBManager(t, ctx, cfg, logger) - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) _, err = logStore.initialize(ctx, 0) @@ -925,7 +925,7 @@ func TestReplicaLogStore_Term(t *testing.T) { require.NoError(t, logStore.close()) // Now restart the log store - logStore, err = NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore, err = NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) defer func() { require.NoError(t, logStore.close()) }() @@ -1242,7 +1242,7 @@ func testAppendLogEntry(t *testing.T, appendFunc func(*testing.T, context.Contex prepopulateEntries(t, ctx, cfg, stagingDir, stateDir, 10) - logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore("test-storage", 1, cfg.Raft, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) _, err = logStore.initialize(ctx, 0) require.NoError(t, err) @@ -1295,7 +1295,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { } // Has not received any notification, yet. Highest appendedLSN is 3. - require.Empty(t, logStore.consumer.(*mockConsumer).GetNotifications()) + require.Empty(t, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 1 require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 1, Vote: 1, Term: 1})) @@ -1309,7 +1309,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 2 require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 2, Vote: 1, Term: 1})) @@ -1329,7 +1329,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(2), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 3 require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 3, Vote: 1, Term: 1})) @@ -1355,7 +1355,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(3), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) }) t.Run("notify consumer since the low water mark", func(t *testing.T) { @@ -1377,7 +1377,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { } // Has not received any notification, yet. Highest appendedLSN is 3. - require.Empty(t, logStore.consumer.(*mockConsumer).GetNotifications()) + require.Empty(t, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 1 require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 1, Vote: 1, Term: 1})) @@ -1391,7 +1391,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(1), highWaterMark: storage.LSN(1), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Simulate applying up to log entry 1 require.NoError(t, logStore.localLog.AcknowledgePosition(log.AppliedPosition, storage.LSN(1))) @@ -1415,7 +1415,7 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(2), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Committed set to 3, but don't update low water mark require.NoError(t, logStore.saveHardState(raftpb.HardState{Commit: 3, Vote: 1, Term: 1})) @@ -1441,14 +1441,14 @@ func TestReplicaLogStore_SaveHardState(t *testing.T) { lowWaterMark: storage.LSN(2), highWaterMark: storage.LSN(3), }, - }, logStore.consumer.(*mockConsumer).GetNotifications()) + }, logStore.consumer.(*MockConsumer).GetNotifications()) // Simulate applying up to log entry 3 require.NoError(t, logStore.localLog.AcknowledgePosition(log.AppliedPosition, storage.LSN(3))) require.Equal(t, storage.LSN(4), logStore.localLog.LowWaterMark()) // No new notifications are sent. - require.Equal(t, 3, len(logStore.consumer.(*mockConsumer).GetNotifications())) + require.Equal(t, 3, len(logStore.consumer.(*MockConsumer).GetNotifications())) }) t.Run("reject LSN beyond appendedLSN", func(t *testing.T) { diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index ac8cf18af5303049a3bed07b05eaadb681bfd412..c9263ff7c353adf2d3f2b31b5d86791588738f61 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -30,7 +30,7 @@ import ( ) const ( - waitTimeout = 5 * time.Second + waitTimeout = 10 * time.Second ) func raftConfigsForTest(t *testing.T) config.Raft { @@ -232,15 +232,16 @@ func TestReplica_Initialize(t *testing.T) { require.NoError(t, localLog.Close()) // Now create a Raft log store pointing to the same directories - logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) require.NoError(t, err) raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") - mgr, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) + mgr, err := raftFactory(ctx, storageName, logStore, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, mgr.Close()) }() @@ -310,7 +311,7 @@ func TestReplica_Initialize(t *testing.T) { require.NoError(t, localLog.Close()) // Now create a Raft log store pointing to the same directories - logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, NewMetrics()) + logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, NewMetrics()) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) @@ -318,7 +319,8 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + mgr, err := raftFactory(ctx, storageName, logStore, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, mgr.Close()) }() @@ -355,7 +357,7 @@ func TestReplica_Initialize(t *testing.T) { metrics := NewMetrics() // PHASE 1: Create a new partition with Raft enabled - logStore1, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore1, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) @@ -363,7 +365,8 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - mgr1, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore1, logger, metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + mgr1, err := raftFactory(ctx, storageName, logStore1, logger, metrics) require.NoError(t, err) // Initialize the raft replica @@ -422,10 +425,11 @@ func TestReplica_Initialize(t *testing.T) { } // PHASE 3: Re-enable Raft - logStore2, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) + logStore2, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) - mgr2, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore2, logger, metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + mgr2, err := raftFactory(ctx, storageName, logStore2, logger, metrics) require.NoError(t, err) // Re-initialize Raft with the highest LSN @@ -700,7 +704,7 @@ func TestReplica_AppendLogEntry(t *testing.T) { db, dbMgr := dbSetup(t, ctx, cfg, testhelper.TempDir(t), storageName, logger) // Create a Raft log store - logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, metrics) + logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, metrics) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) @@ -709,11 +713,10 @@ func TestReplica_AppendLogEntry(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder), WithOpTimeout(1*time.Nanosecond)) // Create replica with very short operation timeout + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") mgr, err := raftFactory( ctx, - 1, storageName, - NewPartitionKey(storageName, partitionID), logStore, logger, metrics, @@ -822,7 +825,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { db, err := dbMgr.GetDB(cfg.Storages[0].Name) require.NoError(t, err) - logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, metrics) + logStore, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, metrics) require.NoError(t, err) raftNode, err := NewNode(cfg, logger, dbMgr, nil) @@ -831,7 +834,8 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) // Configure replica - mgr, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + mgr, err := raftFactory(ctx, storageName, logStore, logger, metrics) require.NoError(t, err) for _, f := range setupFuncs { @@ -876,7 +880,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { db, err := dbMgr.GetDB(env.cfg.Storages[0].Name) require.NoError(t, err) - logStore, err := NewReplicaLogStore(env.storageName, env.partitionID, raftCfg, db, env.stagingDir, env.stateDir, &mockConsumer{}, log.NewPositionTracker(), logger, env.metrics) + logStore, err := NewReplicaLogStore(env.storageName, env.partitionID, raftCfg, db, env.stagingDir, env.stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, env.metrics) require.NoError(t, err) raftNode, err := NewNode(env.cfg, logger, dbMgr, nil) @@ -884,7 +888,8 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(env.recorder)) - recoveryMgr, err := raftFactory(ctx, 1, env.storageName, NewPartitionKey(env.storageName, env.partitionID), logStore, logger, env.metrics) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(env.storageName, env.partitionID), 1, "") + recoveryMgr, err := raftFactory(ctx, env.storageName, logStore, logger, env.metrics) require.NoError(t, err) // Initialize with the last known LSN @@ -1665,7 +1670,8 @@ func TestReplica_StorageConnection(t *testing.T) { // Create factory that connects replicas to storage raftFactory := DefaultFactoryWithNode(cfg.Raft, raftNode) - replica, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + replica, err := raftFactory(ctx, storageName, logStore, logger, NewMetrics()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, replica.Close()) }) @@ -1696,14 +1702,16 @@ func TestReplica_StorageConnection(t *testing.T) { }) t.Run("multiple replicas for same partition key", func(t *testing.T) { - duplicateReplica, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + duplicateReplica, err := raftFactory(ctx, storageName, logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, duplicateReplica) }) t.Run("Register different replicas for different partition keys", func(t *testing.T) { partitionID := storage.PartitionID(2) - replicaTwo, err := raftFactory(ctx, 1, storageName, NewPartitionKey(storageName, partitionID), logStore, logger, NewMetrics()) + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + replicaTwo, err := raftFactory(ctx, storageName, logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, replicaTwo) }) @@ -1743,11 +1751,11 @@ func TestReplica_AddNode(t *testing.T) { }, timeout, 5*time.Millisecond, "replica should become leader") } - // waitForVoters waits for both replicas to have the expected number of voters - waitForReplicaLeader := func(t *testing.T, replicaOne, replicaTwo *Replica, expectedVoters int, timeout time.Duration) { + // waitForReplicaLeader waits for both replicas to have the same leader + waitForReplicaLeader := func(t *testing.T, replicaOne, replicaTwo *Replica, timeout time.Duration) { require.Eventually(t, func() bool { return replicaTwo.leadership.GetLeaderID() == replicaOne.memberID - }, timeout, 5*time.Millisecond, "configuration should stabilize with %d voters", expectedVoters) + }, timeout, 5*time.Millisecond, "replica two should have the same leader as replica one") } drainNotificationQueues := func(t *testing.T, replicas ...*Replica) { @@ -1836,7 +1844,7 @@ func TestReplica_AddNode(t *testing.T) { drainNotificationQueues(t, replica, replicaTwo) - waitForReplicaLeader(t, replica, replicaTwo, 2, waitTimeout) + waitForReplicaLeader(t, replica, replicaTwo, waitTimeout) testhelper.RequirePromMetrics(t, metrics, ` # HELP gitaly_raft_log_entries_processed Rate of log entries processed. @@ -1977,7 +1985,7 @@ func TestReplica_AddNode(t *testing.T) { addedReplica := addressesToReplicas[addedAddress] // Wait for voters count to stabilize between both replicas - waitForReplicaLeader(t, replica, addedReplica, 2, waitTimeout) + waitForReplicaLeader(t, replica, addedReplica, waitTimeout) drainNotificationQueues(t, replica, addedReplica) @@ -2019,7 +2027,7 @@ func TestReplica_AddNode(t *testing.T) { // Wait for the replica to elect itself as leader waitForLeadership(t, replica, waitTimeout) - err := replica.RemoveNode(ctx, 999) + err := replica.RemoveNode(ctx, 999, storageName) require.EqualError(t, err, "translating member ID: no address found for memberID 999") }) @@ -2041,9 +2049,7 @@ func TestReplica_AddNode(t *testing.T) { // Set a random leader ID to simulate a non-leader replica.leadership.SetLeader(999, false) - destination := "default" - - err := replica.AddNode(ctx, "gitaly-node-2:8075", destination) + err := replica.AddNode(ctx, "gitaly-node-2:8075", storageName) require.EqualError(t, err, "replica is not the leader", "adding node should fail when not leader") testhelper.RequirePromMetrics(t, metrics, ` diff --git a/internal/gitaly/storage/raftmgr/routing_table.go b/internal/gitaly/storage/raftmgr/routing_table.go index 5fba4c93147a282daa4c3d4d17d35247bed9c1f8..0ea3ed9d5c3cc9cb8135cc2d0225f8e41e921a45 100644 --- a/internal/gitaly/storage/raftmgr/routing_table.go +++ b/internal/gitaly/storage/raftmgr/routing_table.go @@ -27,18 +27,12 @@ type RoutingTableEntry struct { Index uint64 } -// ReplicaMetadata contains additional information about a replica -// that is needed for routing messages. -type ReplicaMetadata struct { - Address string -} - // RoutingTable handles translation between member IDs and addresses type RoutingTable interface { Translate(partitionKey *gitalypb.RaftPartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) GetEntry(partitionKey *gitalypb.RaftPartitionKey) (*RoutingTableEntry, error) UpsertEntry(entry RoutingTableEntry) error - ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error + ApplyReplicaConfChange(partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error ListEntries() (map[string]*RoutingTableEntry, error) } @@ -145,7 +139,7 @@ func (r *kvRoutingTable) Translate(partitionKey *gitalypb.RaftPartitionKey, memb return nil, fmt.Errorf("no address found for memberID %d", memberID) } -func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error { +func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error { routingTableEntry, err := r.GetEntry(partitionKey) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return fmt.Errorf("getting routing table entry: %w", err) @@ -162,6 +156,7 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey routingTableEntry.Index = changes.Index() metadata := changes.Metadata() + destinationStorageName := changes.DestinationStorageName() for _, confChange := range changes.Changes() { switch confChange.changeType { @@ -173,13 +168,13 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey if slices.ContainsFunc(routingTableEntry.Replicas, func(r *gitalypb.ReplicaID) bool { return r.GetMemberId() == confChange.memberID }) { - return fmt.Errorf("member ID %d already exists", confChange.memberID) + continue } replica := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, - StorageName: storageName, + StorageName: destinationStorageName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_VOTER, } @@ -199,7 +194,7 @@ func (r *kvRoutingTable) ApplyReplicaConfChange(storageName string, partitionKey learner := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, - StorageName: storageName, + StorageName: destinationStorageName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_LEARNER, } diff --git a/internal/gitaly/storage/raftmgr/routing_table_test.go b/internal/gitaly/storage/raftmgr/routing_table_test.go index e1daf265b4aa0aa2a0411d2d0f774254ef302dc7..5b25f94435cb32bf22bd94cf78e18a062c6b180d 100644 --- a/internal/gitaly/storage/raftmgr/routing_table_test.go +++ b/internal/gitaly/storage/raftmgr/routing_table_test.go @@ -12,6 +12,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" ) +var destinationStorageName = "test-storage" + func TestPersistentRoutingTable(t *testing.T) { t.Parallel() @@ -34,7 +36,7 @@ func TestPersistentRoutingTable(t *testing.T) { { PartitionKey: partitionKey, MemberId: uint64(memberID), - StorageName: "test-storage", + StorageName: destinationStorageName, Metadata: &gitalypb.ReplicaID_Metadata{ Address: address, }, @@ -50,7 +52,6 @@ func TestPersistentRoutingTable(t *testing.T) { replica, err := rt.Translate(partitionKey, uint64(memberID)) require.NoError(t, err) require.Equal(t, address, replica.GetMetadata().GetAddress()) - require.Equal(t, "test-storage", replica.GetStorageName()) }) t.Run("stale entry rejected", func(t *testing.T) { @@ -112,10 +113,10 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) - changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 1, destinationStorageName, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -166,10 +167,10 @@ func TestApplyReplicaConfChange(t *testing.T) { err = rt.UpsertEntry(*initialEntry) require.NoError(t, err) - changes := NewReplicaConfChanges(2, 2, 1, 1, nil) + changes := NewReplicaConfChanges(2, 2, 1, destinationStorageName, 1, nil) changes.AddChange(2, ConfChangeRemoveNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -196,10 +197,10 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) - changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 1, destinationStorageName, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddLearnerNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -223,10 +224,10 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) - changes := NewReplicaConfChanges(1, 1, 0, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 0, destinationStorageName, 1, createMetadata("localhost:1234")) changes.AddChange(0, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID should be non-zero") }) @@ -246,21 +247,25 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) // First add a node - changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 1, destinationStorageName, 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) // Try to add the same node ID again - changes = NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) + changes = NewReplicaConfChanges(2, 2, 1, destinationStorageName, 1, createMetadata("localhost:5678")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) - require.Error(t, err) - require.Contains(t, err.Error(), "member ID 1 already exists") - }) + err = rt.ApplyReplicaConfChange(partitionKey, changes) + require.NoError(t, err) + updatedEntry, err := rt.GetEntry(partitionKey) + require.NoError(t, err) + require.Len(t, updatedEntry.Replicas, 1) + require.Equal(t, uint64(1), updatedEntry.Replicas[0].GetMemberId()) + require.Equal(t, "localhost:1234", updatedEntry.Replicas[0].GetMetadata().GetAddress()) + }) t.Run("should error when updating non-existent member ID", func(t *testing.T) { t.Parallel() @@ -276,17 +281,17 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) // Add a node with ID 1 - changes := NewReplicaConfChanges(1, 1, 1, 1, createMetadata("localhost:1234")) + changes := NewReplicaConfChanges(1, 1, 1, "test-storage", 1, createMetadata("localhost:1234")) changes.AddChange(1, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) // Try to update a non-existent node with ID 2 - changes = NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) + changes = NewReplicaConfChanges(2, 2, 1, "test-storage", 1, createMetadata("localhost:5678")) changes.AddChange(2, ConfChangeUpdateNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "member ID 2 not found for update") }) @@ -321,10 +326,10 @@ func TestApplyReplicaConfChange(t *testing.T) { err = rt.UpsertEntry(*entry) require.NoError(t, err) - changes := NewReplicaConfChanges(entry.Term, entry.Index, 1, 1, nil) + changes := NewReplicaConfChanges(entry.Term, entry.Index, 1, "test-storage", 1, nil) changes.AddChange(1, ConfChangeRemoveNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.Error(t, err) require.Contains(t, err.Error(), "no replicas to upsert") }) @@ -360,10 +365,10 @@ func TestApplyReplicaConfChange(t *testing.T) { err = rt.UpsertEntry(*initialEntry) require.NoError(t, err) - changes := NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:5678")) + changes := NewReplicaConfChanges(2, 2, 1, "test-storage", 1, createMetadata("localhost:5678")) changes.AddChange(1, ConfChangeUpdateNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) @@ -405,11 +410,11 @@ func TestApplyReplicaConfChange(t *testing.T) { err = rt.UpsertEntry(*initialEntry) require.NoError(t, err) - changes := NewReplicaConfChanges(2, 2, 1, 1, createMetadata("localhost:8888")) + changes := NewReplicaConfChanges(2, 2, 1, "test-storage", 1, createMetadata("localhost:8888")) changes.AddChange(1, ConfChangeRemoveNode) changes.AddChange(2, ConfChangeAddNode) - err = rt.ApplyReplicaConfChange("test-authority", partitionKey, changes) + err = rt.ApplyReplicaConfChange(partitionKey, changes) require.NoError(t, err) updatedEntry, err := rt.GetEntry(partitionKey) diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index 0b6dc03e05a5755268f15e0497b3f13637529b5c..383032c8a04c908a26c3871c5071fed24e1878fb 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -3,7 +3,6 @@ package raftmgr import ( "context" "net" - "sync" "testing" "github.com/stretchr/testify/require" @@ -42,33 +41,9 @@ func (m *mockReplica) Step(ctx context.Context, msg raftpb.Message) error { return nil } -type mockConsumer struct { - notifications []mockNotification - mutex sync.Mutex -} - -type mockNotification struct { - storageName string - partitionID storage.PartitionID - lowWaterMark storage.LSN - highWaterMark storage.LSN -} - -func (mc *mockConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, committedLSN storage.LSN) { - mc.mutex.Lock() - defer mc.mutex.Unlock() - mc.notifications = append(mc.notifications, mockNotification{ - storageName: storageName, - partitionID: partitionID, - lowWaterMark: lowWaterMark, - highWaterMark: committedLSN, - }) -} - -func (mc *mockConsumer) GetNotifications() []mockNotification { - mc.mutex.Lock() - defer mc.mutex.Unlock() - return mc.notifications +// IsStarted is a mock implementation of RaftReplica.IsStarted +func (m *mockReplica) IsStarted() bool { + return true } func openTestDB(t *testing.T, ctx context.Context, cfg config.Cfg, logger logger.Logger) *databasemgr.DBManager { @@ -103,6 +78,10 @@ func createRaftReplica(t *testing.T, ctx context.Context, memberID uint64, addre Address: address, Options: opts, } + relativePath := "git.git" + storageName := "default" + + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), memberID, relativePath) return createRaftReplicaWithConfig(t, ctx, raftCfg, config, metrics) } @@ -124,7 +103,7 @@ func createRaftReplicaWithConfig(t *testing.T, ctx context.Context, raftCfg conf stateDir := testhelper.TempDir(t) posTracker := log.NewPositionTracker() - logStore, err := NewReplicaLogStore(storageName, config.PartitionID, raftCfg, db, stagingDir, stateDir, &mockConsumer{}, posTracker, logger, metrics) + logStore, err := NewReplicaLogStore(storageName, config.PartitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, posTracker, logger, metrics) if err != nil { return nil, err } @@ -141,7 +120,7 @@ func createRaftReplicaWithConfig(t *testing.T, ctx context.Context, raftCfg conf } raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, config.Options...) - return raftFactory(ctx, config.MemberID, storageName, NewPartitionKey(storageName, config.PartitionID), logStore, logger, metrics) + return raftFactory(ctx, storageName, logStore, logger, metrics) } func createTempServer(t *testing.T, transport *GrpcTransport) (string, *grpc.Server) { diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index 98118781be3ea8ae1ada1ef06371096537665fad..59ee11509274178afc41686681ef40522e6a7d0a 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -65,12 +65,21 @@ func (f Factory) New( if f.raftCfg.Enabled { factory := f.raftFactory - // TODO: The PartitionKey will be retrieved from the incoming context when a Raft - // replica is booted up. This allows us to maintain a persistent key across - // all replicas of the partition. - // storageName will then represent the target storage where we intend on - // placing the new replicated partition. - partitionKey := raftmgr.NewPartitionKey(storageName, partitionID) + partitionInfo := storage.ExtractPartitionInfo(ctx) + if partitionInfo.PartitionKey == nil { + // If partitionKey is not set, it means: + // - It's a first self bootstrapped node in the cluster, + // so we need to set the partitionKey using local storage and partitionID + // and we will set the memberID to 1. + // - A node was previously part of a cluster (with a different member ID) + // and later becomes leader through normal Raft leader election + // and receives requests from Rails without partition context + // so we need a way to retrieve the partitionKey and memberID which was + // originally used by the replica before it became leader. + // https://gitlab.com/gitlab-org/gitaly/-/issues/6877 + partitionKey := raftmgr.NewPartitionKey(storageName, partitionID) + ctx = storage.ContextWithPartitionInfo(ctx, partitionKey, 1, partitionInfo.RelativePath) + } absoluteStateDir = getRaftPartitionPath(storageName, partitionID, absoluteStateDir) @@ -91,9 +100,7 @@ func (f Factory) New( } raftReplica, err := factory( ctx, - 1, storageName, - partitionKey, replicaLogStore, logger, f.partitionMetrics.raft, diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index 7854becb8488366c4ed98bc6a64d216664f7462b..1e130be50a464bdb434b5fc7ece441d708c56d53 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -541,7 +541,6 @@ func (sm *StorageManager) startPartition(ctx context.Context, partitionID storag } logger := sm.logger.WithField("partition_id", partitionID) - mgr := sm.partitionFactory.New( ctx, logger, diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go index ebdbc9d8d9ea01637acfcc9b1b81c27e830848a0..1fe4af2706afa3b8e2f962154e4a8ecf124207f1 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go @@ -32,6 +32,7 @@ import ( type mockPartitionFactory struct { new func( + ctx context.Context, logger log.Logger, partitionID storage.PartitionID, db keyvalue.Transactioner, @@ -47,6 +48,7 @@ type mockPartitionFactory struct { func newStubPartitionFactory() PartitionFactory { return mockPartitionFactory{ new: func( + ctx context.Context, logger log.Logger, partitionID storage.PartitionID, db keyvalue.Transactioner, @@ -129,6 +131,7 @@ func (m mockPartitionFactory) New( stagingDir string, ) Partition { return m.new( + ctx, logger, partitionID, db, diff --git a/proto/cluster.proto b/proto/cluster.proto index 1af29fc130226312cdc766d3c45e8df3f165accd..5fc780607807c3e65447988c8f2757e2c9df7f95 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -59,6 +59,9 @@ message ReplicaID { message Metadata { // address is the network address of the replica. string address = 1; + // relative_path of the repository + string relative_path = 2; + } // metadata contains replica information. Metadata metadata = 4; diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index 63c345ac4b31be8de4a5b0642d69784d91b24e33..4117391f27b2886e0e7fe141ada95c92b9ca5191 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -1122,7 +1122,9 @@ func (x *RaftEntry_LogData) GetPacked() []byte { type ReplicaID_Metadata struct { state protoimpl.MessageState `protogen:"open.v1"` // address is the network address of the replica. - Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + // relative_path of the repository + RelativePath string `protobuf:"bytes,2,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1164,6 +1166,13 @@ func (x *ReplicaID_Metadata) GetAddress() string { return "" } +func (x *ReplicaID_Metadata) GetRelativePath() string { + if x != nil { + return x.RelativePath + } + return "" +} + // StorageStats contains statistics for a specific storage/authority. type ClusterStatistics_StorageStats struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -1332,7 +1341,7 @@ var file_cluster_proto_rawDesc = string([]byte{ 0x61, 0x63, 0x6b, 0x65, 0x64, 0x22, 0x28, 0x0a, 0x10, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, - 0xfa, 0x02, 0x0a, 0x09, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x12, 0x3d, 0x0a, + 0x9f, 0x03, 0x0a, 0x09, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, @@ -1347,188 +1356,190 @@ var file_cluster_proto_rawDesc = string([]byte{ 0x64, 0x61, 0x74, 0x61, 0x12, 0x31, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, - 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, 0x24, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x1a, 0x49, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x5d, 0x0a, - 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1c, 0x0a, 0x18, - 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, - 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x52, 0x45, - 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x56, 0x4f, 0x54, 0x45, 0x52, - 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, - 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, 0x45, 0x52, 0x10, 0x02, 0x22, 0x90, 0x01, 0x0a, - 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, - 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x70, 0x62, 0x2e, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, - 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, 0x01, 0x0a, 0x1a, 0x52, 0x61, 0x66, 0x74, 0x53, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x23, 0x0a, + 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, + 0x74, 0x68, 0x22, 0x5d, 0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x1c, 0x0a, 0x18, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, + 0x16, 0x0a, 0x12, 0x52, 0x45, 0x50, 0x4c, 0x49, 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, + 0x56, 0x4f, 0x54, 0x45, 0x52, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x50, 0x4c, 0x49, + 0x43, 0x41, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x45, 0x41, 0x52, 0x4e, 0x45, 0x52, 0x10, + 0x02, 0x22, 0x90, 0x01, 0x0a, 0x12, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x72, 0x61, 0x66, + 0x74, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x86, 0x01, 0x0a, 0x1a, + 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x72, 0x61, + 0x66, 0x74, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x61, 0x66, 0x74, + 0x4d, 0x73, 0x67, 0x12, 0x16, 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0c, 0x48, 0x00, 0x52, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, 0x0a, 0x15, 0x72, + 0x61, 0x66, 0x74, 0x5f, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, + 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, + 0x74, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x6e, + 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x53, 0x69, 0x7a, 0x65, 0x22, 0xb4, 0x02, 0x0a, 0x12, 0x4a, + 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, + 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, + 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, + 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x08, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, + 0x09, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x08, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, + 0x72, 0x6d, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x14, + 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x69, + 0x6e, 0x64, 0x65, 0x78, 0x12, 0x27, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x42, 0x04, 0x88, 0xc6, 0x2c, 0x01, + 0x52, 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, + 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x07, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, + 0x74, 0x68, 0x12, 0x2d, 0x0a, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x08, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, + 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, + 0x73, 0x22, 0x15, 0x0a, 0x13, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xa1, 0x02, 0x0a, 0x14, 0x47, 0x65, 0x74, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, + 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, + 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, + 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, + 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, + 0x50, 0x61, 0x74, 0x68, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x36, + 0x0a, 0x17, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x5f, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x15, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x44, + 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x12, 0x34, 0x0a, 0x16, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, + 0x65, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x14, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x52, + 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, 0x22, 0xd2, 0x03, 0x0a, + 0x11, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, + 0x63, 0x73, 0x12, 0x29, 0x0a, 0x10, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x74, 0x6f, + 0x74, 0x61, 0x6c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2d, 0x0a, + 0x12, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x68, 0x65, 0x61, 0x6c, 0x74, + 0x68, 0x79, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x25, 0x0a, 0x0e, + 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0d, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x73, 0x12, 0x29, 0x0a, 0x10, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x72, + 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x68, + 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x50, + 0x0a, 0x0d, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, + 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, + 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x52, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, + 0x1a, 0x56, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, + 0x12, 0x21, 0x0a, 0x0c, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x43, 0x6f, + 0x75, 0x6e, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x63, + 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x72, 0x65, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x67, 0x0a, 0x11, 0x53, 0x74, 0x6f, 0x72, + 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x3c, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, + 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, + 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, + 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x22, 0xa7, 0x04, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x63, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x47, 0x0a, 0x08, 0x72, 0x65, 0x70, + 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, + 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, + 0x65, 0x72, 0x6d, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, + 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x25, + 0x0a, 0x0e, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, + 0x18, 0x08, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, + 0x50, 0x61, 0x74, 0x68, 0x73, 0x1a, 0xd3, 0x01, 0x0a, 0x0d, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, + 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, + 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x0a, 0x69, 0x73, 0x5f, 0x68, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x69, 0x73, 0x48, 0x65, + 0x61, 0x6c, 0x74, 0x68, 0x79, 0x12, 0x1d, 0x0a, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x69, 0x6e, + 0x64, 0x65, 0x78, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6c, 0x61, 0x73, 0x74, 0x49, + 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1f, 0x0a, 0x0b, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x6e, + 0x64, 0x65, 0x78, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x6d, 0x61, 0x74, 0x63, 0x68, + 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x37, 0x0a, 0x16, 0x52, + 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x49, 0x64, 0x22, 0x73, 0x0a, 0x17, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x39, + 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x0a, 0x73, + 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x32, 0xcf, 0x03, 0x0a, 0x0b, 0x52, 0x61, + 0x66, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, 0x0b, 0x53, 0x65, 0x6e, + 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, + 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x63, 0x0a, + 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x22, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, + 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x08, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6d, 0x73, - 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, 0x61, 0x66, 0x74, 0x4d, 0x73, 0x67, 0x12, 0x16, - 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, - 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x42, 0x17, 0x0a, 0x15, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x73, - 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, - 0x64, 0x0a, 0x1b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x20, - 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x5f, 0x73, 0x69, 0x7a, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, - 0x74, 0x53, 0x69, 0x7a, 0x65, 0x22, 0xb4, 0x02, 0x0a, 0x12, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3d, 0x0a, 0x0d, - 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, - 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x6c, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, - 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x6d, 0x65, 0x6d, 0x62, - 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6d, 0x65, 0x6d, - 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, - 0x65, 0x78, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, - 0x27, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, - 0x06, 0x20, 0x01, 0x28, 0x09, 0x42, 0x04, 0x88, 0xc6, 0x2c, 0x01, 0x52, 0x0b, 0x73, 0x74, 0x6f, - 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, - 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x2d, 0x0a, - 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x49, 0x44, 0x52, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x22, 0x15, 0x0a, 0x13, - 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0xa1, 0x02, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, - 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0d, 0x70, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, - 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, - 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, - 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x36, 0x0a, 0x17, 0x69, 0x6e, 0x63, - 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x64, 0x65, 0x74, - 0x61, 0x69, 0x6c, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x15, 0x69, 0x6e, 0x63, 0x6c, - 0x75, 0x64, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, - 0x73, 0x12, 0x34, 0x0a, 0x16, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x72, 0x65, 0x6c, - 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x14, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x52, 0x65, 0x6c, 0x61, 0x74, 0x69, - 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, 0x22, 0xd2, 0x03, 0x0a, 0x11, 0x43, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x12, 0x29, 0x0a, - 0x10, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2d, 0x0a, 0x12, 0x68, 0x65, 0x61, 0x6c, - 0x74, 0x68, 0x79, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x74, 0x6f, 0x74, 0x61, 0x6c, - 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x0d, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x29, - 0x0a, 0x10, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x5f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x79, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x50, 0x0a, 0x0d, 0x73, 0x74, 0x6f, - 0x72, 0x61, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x2b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x72, - 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0c, 0x73, - 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x1a, 0x56, 0x0a, 0x0c, 0x53, - 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x6c, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x0b, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x23, - 0x0a, 0x0d, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x43, 0x6f, - 0x75, 0x6e, 0x74, 0x1a, 0x67, 0x0a, 0x11, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, - 0x61, 0x74, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x3c, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, - 0x74, 0x69, 0x63, 0x73, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, - 0x73, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xa7, 0x04, 0x0a, - 0x15, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x4b, 0x65, 0x79, 0x12, 0x47, 0x0a, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, - 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x08, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x12, 0x1b, 0x0a, - 0x09, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, - 0x52, 0x08, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, - 0x72, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, 0x12, 0x14, - 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x69, - 0x6e, 0x64, 0x65, 0x78, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, - 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, - 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x6c, - 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x73, 0x18, 0x08, 0x20, 0x03, 0x28, - 0x09, 0x52, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x73, - 0x1a, 0xd3, 0x01, 0x0a, 0x0d, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x12, 0x30, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x49, 0x44, 0x52, 0x09, 0x72, 0x65, 0x70, 0x6c, 0x69, - 0x63, 0x61, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, 0x6c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x12, 0x1d, 0x0a, 0x0a, 0x69, 0x73, 0x5f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x69, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x79, - 0x12, 0x1d, 0x0a, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6c, 0x61, 0x73, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, - 0x1f, 0x0a, 0x0b, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x64, 0x65, 0x78, - 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x37, 0x0a, 0x16, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, + 0x28, 0x01, 0x12, 0x50, 0x0a, 0x0b, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, + 0x08, 0x01, 0x10, 0x02, 0x12, 0x58, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, + 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x02, 0x10, 0x02, 0x30, 0x01, 0x12, 0x5b, + 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, + 0x12, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x22, - 0x73, 0x0a, 0x17, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, - 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, - 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, - 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, - 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, - 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x69, 0x73, - 0x74, 0x69, 0x63, 0x73, 0x32, 0xcf, 0x03, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x12, 0x52, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x12, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, - 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, - 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x63, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, - 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, - 0x6f, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x28, 0x01, 0x12, 0x50, 0x0a, - 0x0b, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, 0x10, 0x02, 0x12, - 0x58, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x12, 0x1c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, - 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, - 0x97, 0x28, 0x04, 0x08, 0x02, 0x10, 0x02, 0x30, 0x01, 0x12, 0x5b, 0x0a, 0x0e, 0x47, 0x65, 0x74, - 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1e, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, - 0x28, 0x04, 0x08, 0x02, 0x10, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, - 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x38, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x1a, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x02, 0x10, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, + 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, + 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x38, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var (