From 6668a7cea0fe7e8f2f5d8cf2b4c383b948de3361 Mon Sep 17 00:00:00 2001 From: Jia Shiang Gao Date: Tue, 5 Apr 2022 00:52:40 -0700 Subject: [PATCH] praefect/service: RepositoryReplicas RPC fails if a replica does not exist Because the original error message is not dictating ''a replica does not yet exist on the host" when the repository is not replicated yet to all the hosts. So in this commit, check whether the error gets from the `CalculateChecksum` Grpc call is dataloss or not. If it is dataloss, then return a more suitable error message. --- internal/praefect/info_service_test.go | 287 ++++++++++-------- .../praefect/service/info/repositories.go | 13 + 2 files changed, 175 insertions(+), 125 deletions(-) diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index c89654f6ab2..b0f362616e3 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -2,6 +2,8 @@ package praefect import ( "math/rand" + "os" + "path/filepath" "testing" "github.com/stretchr/testify/require" @@ -23,137 +25,172 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestInfoService_RepositoryReplicas(t *testing.T) { t.Parallel() - - var cfgs []gconfig.Cfg - var cfgNodes []*config.Node storages := []string{"g-1", "g-2", "g-3"} - for i, storage := range storages { - cfg := testcfg.Build(t, testcfg.WithStorages(storage)) - cfgs = append(cfgs, cfg) - cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv *grpc.Server, deps *service.Dependencies) { - gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( - deps.GetCfg(), - deps.GetRubyServer(), - deps.GetLocator(), - deps.GetTxManager(), - deps.GetGitCmdFactory(), - deps.GetCatfileCache(), - deps.GetConnsPool(), - deps.GetGit2goExecutor(), - deps.GetHousekeepingManager(), - )) - }, testserver.WithDisablePraefect()) - cfgNodes = append(cfgNodes, &config.Node{ - Storage: cfgs[i].Storages[0].Name, - Address: cfgs[i].SocketPath, - Token: cfgs[i].Auth.Token, - }) - } - - const virtualStorage = "default" - conf := config.Config{ - VirtualStorages: []*config.VirtualStorage{{Name: virtualStorage, Nodes: cfgNodes}}, - Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}, - } - - ctx := testhelper.Context(t) - - db := testdb.New(t) - logger := testhelper.NewDiscardingLogEntry(t) - // the only thing used from the config is the grpc_latency_buckets which is not relevant for the test - txManager := transactions.NewManager(config.Config{}) - sidechannelRegistry := sidechannel.NewRegistry() - nodeSet, err := DialNodes( - ctx, - conf.VirtualStorages, - protoregistry.GitalyProtoPreregistered, - nil, - backchannel.NewClientHandshaker( - logger, - NewBackchannelServerFactory( - logger, - transaction.NewServer(txManager), + for _, tc := range []struct { + desc string + removeRepoStorageIndex []int + error error + }{ + { + desc: "Get all checksums successfully. And has a replica, which has a newer version repository.", + }, + { + desc: "Faild to get all checksums. Because one replica is not replicated yet", + removeRepoStorageIndex: []int{1}, + error: status.Errorf(codes.DataLoss, "replica on storage, which named g-2 does not yet exist on the host"), + }, + { + desc: "Faild to get all checksums. Because two replica is not replicated yet", + removeRepoStorageIndex: []int{1, 2}, + error: status.Errorf(codes.DataLoss, "replica on storage, which named g-2, g-3 does not yet exist on the host"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var cfgs []gconfig.Cfg + var cfgNodes []*config.Node + for i, storage := range storages { + cfg := testcfg.Build(t, testcfg.WithStorages(storage)) + cfgs = append(cfgs, cfg) + cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( + deps.GetCfg(), + deps.GetRubyServer(), + deps.GetLocator(), + deps.GetTxManager(), + deps.GetGitCmdFactory(), + deps.GetCatfileCache(), + deps.GetConnsPool(), + deps.GetGit2goExecutor(), + deps.GetHousekeepingManager(), + )) + }, testserver.WithDisablePraefect()) + cfgNodes = append(cfgNodes, &config.Node{ + Storage: cfgs[i].Storages[0].Name, + Address: cfgs[i].SocketPath, + Token: cfgs[i].Auth.Token, + }) + } + + const virtualStorage = "default" + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{{Name: virtualStorage, Nodes: cfgNodes}}, + Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}, + } + + ctx := testhelper.Context(t) + + db := testdb.New(t) + logger := testhelper.NewDiscardingLogEntry(t) + // the only thing used from the config is the grpc_latency_buckets which is not relevant for the test + txManager := transactions.NewManager(config.Config{}) + sidechannelRegistry := sidechannel.NewRegistry() + nodeSet, err := DialNodes( + ctx, + conf.VirtualStorages, + protoregistry.GitalyProtoPreregistered, + nil, + backchannel.NewClientHandshaker( + logger, + NewBackchannelServerFactory( + logger, + transaction.NewServer(txManager), + sidechannelRegistry, + ), + ), sidechannelRegistry, - ), - ), - sidechannelRegistry, - ) - require.NoError(t, err) - t.Cleanup(nodeSet.Close) - - // Use a transaction in the elector itself to avoid flakiness due to the health timeout. We mark - // only the first repo has healthy so the elector picks it always as the primary. - tx := db.Begin(t) - t.Cleanup(func() { tx.Rollback(t) }) - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{ - "praefect-0": {virtualStorage: storages[0:1]}, - }) - elector := nodes.NewPerRepositoryElector(tx) - - conns := nodeSet.Connections() - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - - cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withConnections: conns, - withRepoStore: rs, - withRouter: NewPerRepositoryRouter( - conns, - elector, - StaticHealthChecker{virtualStorage: storages}, - NewLockedRandom(rand.New(rand.NewSource(0))), - rs, - datastore.NewAssignmentStore(db, conf.StorageNames()), - rs, - conf.DefaultReplicationFactors(), - ), - withPrimaryGetter: elector, - withTxMgr: txManager, - }) - // use cleanup to close the connections as gittest.CreateRepository will still use the connection - // for clean up after the test. - t.Cleanup(cleanup) - - client := gitalypb.NewPraefectInfoServiceClient(cc) - - testRepository, testRepoPath := gittest.CreateRepository(ctx, t, - // The helper was implemented with the test server in mind. Here we need use the virtual storage's name - // as the storage and the path of the storage we want to modify the replica in. - gconfig.Cfg{Storages: []gconfig.Storage{{Name: virtualStorage, Path: cfgs[1].Storages[0].Path}}}, - gittest.CreateRepositoryConfig{Seed: gittest.SeedGitLabTest, ClientConn: cc}, - ) - - // create a commit in the second replica so we can check that its checksum is different than the primary - gittest.WriteCommit(t, cfgs[1], testRepoPath, gittest.WithBranch("master")) - - // Increment the generation of the unmodified repositories so the below CalculateChecksum calls goes to one of them - // as the test expects the primary to have that checksum. - require.NoError(t, rs.IncrementGeneration(ctx, 1, cfgs[0].Storages[0].Name, []string{cfgs[2].Storages[0].Name})) - - // CalculateChecksum through praefect will get the checksum of the primary - checksum, err := gitalypb.NewRepositoryServiceClient(cc).CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{Repository: testRepository}) - require.NoError(t, err) - - resp, err := client.RepositoryReplicas(ctx, &gitalypb.RepositoryReplicasRequest{Repository: testRepository}) - require.NoError(t, err) - - require.Equal(t, checksum.Checksum, resp.Primary.Checksum) - var checked []string - for _, secondary := range resp.GetReplicas() { - switch storage := secondary.GetRepository().GetStorageName(); storage { - case conf.VirtualStorages[0].Nodes[1].Storage: - require.NotEqual(t, checksum.Checksum, secondary.Checksum, "should not be equal since we added a commit") - checked = append(checked, storage) - case conf.VirtualStorages[0].Nodes[2].Storage: - require.Equal(t, checksum.Checksum, secondary.Checksum) - checked = append(checked, storage) - default: - require.FailNow(t, "unexpected storage: %q", storage) - } + ) + require.NoError(t, err) + t.Cleanup(nodeSet.Close) + + // Use a transaction in the elector itself to avoid flakiness due to the health timeout. We mark + // only the first repo has healthy so the elector picks it always as the primary. + tx := db.Begin(t) + t.Cleanup(func() { tx.Rollback(t) }) + testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{ + "praefect-0": {virtualStorage: storages[0:1]}, + }) + elector := nodes.NewPerRepositoryElector(tx) + + conns := nodeSet.Connections() + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + + cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ + withConnections: conns, + withRepoStore: rs, + withRouter: NewPerRepositoryRouter( + conns, + elector, + StaticHealthChecker{virtualStorage: storages}, + NewLockedRandom(rand.New(rand.NewSource(0))), + rs, + datastore.NewAssignmentStore(db, conf.StorageNames()), + rs, + conf.DefaultReplicationFactors(), + ), + withPrimaryGetter: elector, + withTxMgr: txManager, + }) + // use cleanup to close the connections as gittest.CreateRepository will still use the connection + // for clean up after the test. + t.Cleanup(cleanup) + + client := gitalypb.NewPraefectInfoServiceClient(cc) + + testRepository, testRepoPath := gittest.CreateRepository(ctx, t, + // The helper was implemented with the test server in mind. Here we need use the virtual storage's name + // as the storage and the path of the storage we want to modify the replica in. + gconfig.Cfg{Storages: []gconfig.Storage{{Name: virtualStorage, Path: cfgs[1].Storages[0].Path}}}, + gittest.CreateRepositoryConfig{Seed: gittest.SeedGitLabTest, ClientConn: cc}, + ) + + // create a commit in the second replica so we can check that its checksum is different than the primary + gittest.WriteCommit(t, cfgs[1], testRepoPath, gittest.WithBranch("master")) + + // Increment the generation of the unmodified repositories so the below CalculateChecksum calls goes to one of them + // as the test expects the primary to have that checksum. + require.NoError(t, rs.IncrementGeneration(ctx, 1, cfgs[0].Storages[0].Name, []string{cfgs[2].Storages[0].Name})) + + // CalculateChecksum through praefect will get the checksum of the primary + checksum, err := gitalypb.NewRepositoryServiceClient(cc).CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{Repository: testRepository}) + require.NoError(t, err) + var paths []string + for _, cfg := range cfgs { + paths = append(paths, cfg.Storages[0].Path) + } + // Force an empty HEAD file + for _, storageIndex := range tc.removeRepoStorageIndex { + require.NoError(t, os.Truncate(filepath.Join(paths[storageIndex], testRepository.RelativePath, "HEAD"), 0)) + } + + resp, err := client.RepositoryReplicas(ctx, &gitalypb.RepositoryReplicasRequest{Repository: testRepository}) + if tc.error == nil { + require.NoError(t, err) + + require.Equal(t, checksum.Checksum, resp.Primary.Checksum) + var checked []string + for _, secondary := range resp.GetReplicas() { + switch storage := secondary.GetRepository().GetStorageName(); storage { + case conf.VirtualStorages[0].Nodes[1].Storage: + require.NotEqual(t, checksum.Checksum, secondary.Checksum, "should not be equal since we added a commit") + checked = append(checked, storage) + case conf.VirtualStorages[0].Nodes[2].Storage: + require.Equal(t, checksum.Checksum, secondary.Checksum) + checked = append(checked, storage) + default: + require.FailNow(t, "unexpected storage: %q", storage) + } + } + require.ElementsMatch(t, []string{conf.VirtualStorages[0].Nodes[1].Storage, conf.VirtualStorages[0].Nodes[2].Storage}, checked) + } else { + require.EqualError(t, err, tc.error.Error()) + } + }) } - require.ElementsMatch(t, []string{conf.VirtualStorages[0].Nodes[1].Storage, conf.VirtualStorages[0].Nodes[2].Storage}, checked) } diff --git a/internal/praefect/service/info/repositories.go b/internal/praefect/service/info/repositories.go index 6b00a4f03d1..e1c61876e14 100644 --- a/internal/praefect/service/info/repositories.go +++ b/internal/praefect/service/info/repositories.go @@ -3,11 +3,14 @@ package info import ( "context" "fmt" + "strings" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // RepositoryReplicas returns a list of repositories that includes the checksum of the primary as well as the replicas @@ -74,6 +77,16 @@ func (s *Server) RepositoryReplicas(ctx context.Context, in *gitalypb.Repository } if err := g.Wait(); err != nil { + var datalossReplicas []string + for i, replica := range resp.Replicas { + if replica == nil { + datalossReplicas = append(datalossReplicas, secondaries[i]) + } + } + + if helper.GrpcCode(err) == codes.DataLoss { + return nil, status.Errorf(codes.DataLoss, "replica on storage, which named %s does not yet exist on the host", strings.Join(datalossReplicas, ", ")) + } return nil, helper.ErrInternal(err) } -- GitLab