diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index c89654f6ab284738ff79ef3544b413f638487fa1..b0f362616e38be4bf5e129137efce8f0afd8b895 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -2,6 +2,8 @@ package praefect import ( "math/rand" + "os" + "path/filepath" "testing" "github.com/stretchr/testify/require" @@ -23,137 +25,172 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestInfoService_RepositoryReplicas(t *testing.T) { t.Parallel() - - var cfgs []gconfig.Cfg - var cfgNodes []*config.Node storages := []string{"g-1", "g-2", "g-3"} - for i, storage := range storages { - cfg := testcfg.Build(t, testcfg.WithStorages(storage)) - cfgs = append(cfgs, cfg) - cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv *grpc.Server, deps *service.Dependencies) { - gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( - deps.GetCfg(), - deps.GetRubyServer(), - deps.GetLocator(), - deps.GetTxManager(), - deps.GetGitCmdFactory(), - deps.GetCatfileCache(), - deps.GetConnsPool(), - deps.GetGit2goExecutor(), - deps.GetHousekeepingManager(), - )) - }, testserver.WithDisablePraefect()) - cfgNodes = append(cfgNodes, &config.Node{ - Storage: cfgs[i].Storages[0].Name, - Address: cfgs[i].SocketPath, - Token: cfgs[i].Auth.Token, - }) - } - - const virtualStorage = "default" - conf := config.Config{ - VirtualStorages: []*config.VirtualStorage{{Name: virtualStorage, Nodes: cfgNodes}}, - Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}, - } - - ctx := testhelper.Context(t) - - db := testdb.New(t) - logger := testhelper.NewDiscardingLogEntry(t) - // the only thing used from the config is the grpc_latency_buckets which is not relevant for the test - txManager := transactions.NewManager(config.Config{}) - sidechannelRegistry := sidechannel.NewRegistry() - nodeSet, err := DialNodes( - ctx, - conf.VirtualStorages, - protoregistry.GitalyProtoPreregistered, - nil, - backchannel.NewClientHandshaker( - logger, - NewBackchannelServerFactory( - logger, - transaction.NewServer(txManager), + for _, tc := range []struct { + desc string + removeRepoStorageIndex []int + error error + }{ + { + desc: "Get all checksums successfully. And has a replica, which has a newer version repository.", + }, + { + desc: "Faild to get all checksums. Because one replica is not replicated yet", + removeRepoStorageIndex: []int{1}, + error: status.Errorf(codes.DataLoss, "replica on storage, which named g-2 does not yet exist on the host"), + }, + { + desc: "Faild to get all checksums. Because two replica is not replicated yet", + removeRepoStorageIndex: []int{1, 2}, + error: status.Errorf(codes.DataLoss, "replica on storage, which named g-2, g-3 does not yet exist on the host"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var cfgs []gconfig.Cfg + var cfgNodes []*config.Node + for i, storage := range storages { + cfg := testcfg.Build(t, testcfg.WithStorages(storage)) + cfgs = append(cfgs, cfg) + cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( + deps.GetCfg(), + deps.GetRubyServer(), + deps.GetLocator(), + deps.GetTxManager(), + deps.GetGitCmdFactory(), + deps.GetCatfileCache(), + deps.GetConnsPool(), + deps.GetGit2goExecutor(), + deps.GetHousekeepingManager(), + )) + }, testserver.WithDisablePraefect()) + cfgNodes = append(cfgNodes, &config.Node{ + Storage: cfgs[i].Storages[0].Name, + Address: cfgs[i].SocketPath, + Token: cfgs[i].Auth.Token, + }) + } + + const virtualStorage = "default" + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{{Name: virtualStorage, Nodes: cfgNodes}}, + Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}, + } + + ctx := testhelper.Context(t) + + db := testdb.New(t) + logger := testhelper.NewDiscardingLogEntry(t) + // the only thing used from the config is the grpc_latency_buckets which is not relevant for the test + txManager := transactions.NewManager(config.Config{}) + sidechannelRegistry := sidechannel.NewRegistry() + nodeSet, err := DialNodes( + ctx, + conf.VirtualStorages, + protoregistry.GitalyProtoPreregistered, + nil, + backchannel.NewClientHandshaker( + logger, + NewBackchannelServerFactory( + logger, + transaction.NewServer(txManager), + sidechannelRegistry, + ), + ), sidechannelRegistry, - ), - ), - sidechannelRegistry, - ) - require.NoError(t, err) - t.Cleanup(nodeSet.Close) - - // Use a transaction in the elector itself to avoid flakiness due to the health timeout. We mark - // only the first repo has healthy so the elector picks it always as the primary. - tx := db.Begin(t) - t.Cleanup(func() { tx.Rollback(t) }) - testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{ - "praefect-0": {virtualStorage: storages[0:1]}, - }) - elector := nodes.NewPerRepositoryElector(tx) - - conns := nodeSet.Connections() - rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) - - cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ - withConnections: conns, - withRepoStore: rs, - withRouter: NewPerRepositoryRouter( - conns, - elector, - StaticHealthChecker{virtualStorage: storages}, - NewLockedRandom(rand.New(rand.NewSource(0))), - rs, - datastore.NewAssignmentStore(db, conf.StorageNames()), - rs, - conf.DefaultReplicationFactors(), - ), - withPrimaryGetter: elector, - withTxMgr: txManager, - }) - // use cleanup to close the connections as gittest.CreateRepository will still use the connection - // for clean up after the test. - t.Cleanup(cleanup) - - client := gitalypb.NewPraefectInfoServiceClient(cc) - - testRepository, testRepoPath := gittest.CreateRepository(ctx, t, - // The helper was implemented with the test server in mind. Here we need use the virtual storage's name - // as the storage and the path of the storage we want to modify the replica in. - gconfig.Cfg{Storages: []gconfig.Storage{{Name: virtualStorage, Path: cfgs[1].Storages[0].Path}}}, - gittest.CreateRepositoryConfig{Seed: gittest.SeedGitLabTest, ClientConn: cc}, - ) - - // create a commit in the second replica so we can check that its checksum is different than the primary - gittest.WriteCommit(t, cfgs[1], testRepoPath, gittest.WithBranch("master")) - - // Increment the generation of the unmodified repositories so the below CalculateChecksum calls goes to one of them - // as the test expects the primary to have that checksum. - require.NoError(t, rs.IncrementGeneration(ctx, 1, cfgs[0].Storages[0].Name, []string{cfgs[2].Storages[0].Name})) - - // CalculateChecksum through praefect will get the checksum of the primary - checksum, err := gitalypb.NewRepositoryServiceClient(cc).CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{Repository: testRepository}) - require.NoError(t, err) - - resp, err := client.RepositoryReplicas(ctx, &gitalypb.RepositoryReplicasRequest{Repository: testRepository}) - require.NoError(t, err) - - require.Equal(t, checksum.Checksum, resp.Primary.Checksum) - var checked []string - for _, secondary := range resp.GetReplicas() { - switch storage := secondary.GetRepository().GetStorageName(); storage { - case conf.VirtualStorages[0].Nodes[1].Storage: - require.NotEqual(t, checksum.Checksum, secondary.Checksum, "should not be equal since we added a commit") - checked = append(checked, storage) - case conf.VirtualStorages[0].Nodes[2].Storage: - require.Equal(t, checksum.Checksum, secondary.Checksum) - checked = append(checked, storage) - default: - require.FailNow(t, "unexpected storage: %q", storage) - } + ) + require.NoError(t, err) + t.Cleanup(nodeSet.Close) + + // Use a transaction in the elector itself to avoid flakiness due to the health timeout. We mark + // only the first repo has healthy so the elector picks it always as the primary. + tx := db.Begin(t) + t.Cleanup(func() { tx.Rollback(t) }) + testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{ + "praefect-0": {virtualStorage: storages[0:1]}, + }) + elector := nodes.NewPerRepositoryElector(tx) + + conns := nodeSet.Connections() + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + + cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ + withConnections: conns, + withRepoStore: rs, + withRouter: NewPerRepositoryRouter( + conns, + elector, + StaticHealthChecker{virtualStorage: storages}, + NewLockedRandom(rand.New(rand.NewSource(0))), + rs, + datastore.NewAssignmentStore(db, conf.StorageNames()), + rs, + conf.DefaultReplicationFactors(), + ), + withPrimaryGetter: elector, + withTxMgr: txManager, + }) + // use cleanup to close the connections as gittest.CreateRepository will still use the connection + // for clean up after the test. + t.Cleanup(cleanup) + + client := gitalypb.NewPraefectInfoServiceClient(cc) + + testRepository, testRepoPath := gittest.CreateRepository(ctx, t, + // The helper was implemented with the test server in mind. Here we need use the virtual storage's name + // as the storage and the path of the storage we want to modify the replica in. + gconfig.Cfg{Storages: []gconfig.Storage{{Name: virtualStorage, Path: cfgs[1].Storages[0].Path}}}, + gittest.CreateRepositoryConfig{Seed: gittest.SeedGitLabTest, ClientConn: cc}, + ) + + // create a commit in the second replica so we can check that its checksum is different than the primary + gittest.WriteCommit(t, cfgs[1], testRepoPath, gittest.WithBranch("master")) + + // Increment the generation of the unmodified repositories so the below CalculateChecksum calls goes to one of them + // as the test expects the primary to have that checksum. + require.NoError(t, rs.IncrementGeneration(ctx, 1, cfgs[0].Storages[0].Name, []string{cfgs[2].Storages[0].Name})) + + // CalculateChecksum through praefect will get the checksum of the primary + checksum, err := gitalypb.NewRepositoryServiceClient(cc).CalculateChecksum(ctx, &gitalypb.CalculateChecksumRequest{Repository: testRepository}) + require.NoError(t, err) + var paths []string + for _, cfg := range cfgs { + paths = append(paths, cfg.Storages[0].Path) + } + // Force an empty HEAD file + for _, storageIndex := range tc.removeRepoStorageIndex { + require.NoError(t, os.Truncate(filepath.Join(paths[storageIndex], testRepository.RelativePath, "HEAD"), 0)) + } + + resp, err := client.RepositoryReplicas(ctx, &gitalypb.RepositoryReplicasRequest{Repository: testRepository}) + if tc.error == nil { + require.NoError(t, err) + + require.Equal(t, checksum.Checksum, resp.Primary.Checksum) + var checked []string + for _, secondary := range resp.GetReplicas() { + switch storage := secondary.GetRepository().GetStorageName(); storage { + case conf.VirtualStorages[0].Nodes[1].Storage: + require.NotEqual(t, checksum.Checksum, secondary.Checksum, "should not be equal since we added a commit") + checked = append(checked, storage) + case conf.VirtualStorages[0].Nodes[2].Storage: + require.Equal(t, checksum.Checksum, secondary.Checksum) + checked = append(checked, storage) + default: + require.FailNow(t, "unexpected storage: %q", storage) + } + } + require.ElementsMatch(t, []string{conf.VirtualStorages[0].Nodes[1].Storage, conf.VirtualStorages[0].Nodes[2].Storage}, checked) + } else { + require.EqualError(t, err, tc.error.Error()) + } + }) } - require.ElementsMatch(t, []string{conf.VirtualStorages[0].Nodes[1].Storage, conf.VirtualStorages[0].Nodes[2].Storage}, checked) } diff --git a/internal/praefect/service/info/repositories.go b/internal/praefect/service/info/repositories.go index 6b00a4f03d1ec2c0865be2a7495008602f1730b5..e1c61876e14307e7591fbcf5020cf648ca470e1c 100644 --- a/internal/praefect/service/info/repositories.go +++ b/internal/praefect/service/info/repositories.go @@ -3,11 +3,14 @@ package info import ( "context" "fmt" + "strings" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // RepositoryReplicas returns a list of repositories that includes the checksum of the primary as well as the replicas @@ -74,6 +77,16 @@ func (s *Server) RepositoryReplicas(ctx context.Context, in *gitalypb.Repository } if err := g.Wait(); err != nil { + var datalossReplicas []string + for i, replica := range resp.Replicas { + if replica == nil { + datalossReplicas = append(datalossReplicas, secondaries[i]) + } + } + + if helper.GrpcCode(err) == codes.DataLoss { + return nil, status.Errorf(codes.DataLoss, "replica on storage, which named %s does not yet exist on the host", strings.Join(datalossReplicas, ", ")) + } return nil, helper.ErrInternal(err) }