From d11a4b140b1a555aac113a4733203c319e1a719e Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Wed, 20 Aug 2025 18:36:33 +0200 Subject: [PATCH] reftable: Add a recovery middleware --- internal/cli/gitaly/serve.go | 4 + .../migration/reftable/recovery_middleware.go | 164 ++++++++++++++++++ .../reftable/recovery_middleware_test.go | 153 ++++++++++++++++ 3 files changed, 321 insertions(+) create mode 100644 internal/gitaly/storage/storagemgr/partition/migration/reftable/recovery_middleware.go create mode 100644 internal/gitaly/storage/storagemgr/partition/migration/reftable/recovery_middleware_test.go diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 62710e00b0b..d24b121ebeb 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -548,12 +548,16 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { } defer nodeMgr.Close() + reftableRecovery := reftable.NewRecoveryMiddleware(protoregistry.GitalyProtoPreregistered, node, locator, localrepoFactory) recoveryMiddleware := storagemgr.NewTransactionRecoveryMiddleware(protoregistry.GitalyProtoPreregistered, nodeMgr) + txMiddleware = server.TransactionMiddleware{ UnaryInterceptors: []grpc.UnaryServerInterceptor{ + reftableRecovery.UnaryServerInterceptor(), recoveryMiddleware.UnaryServerInterceptor(), }, StreamInterceptors: []grpc.StreamServerInterceptor{ + reftableRecovery.StreamServerInterceptor(), recoveryMiddleware.StreamServerInterceptor(), }, } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/recovery_middleware.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/recovery_middleware.go new file mode 100644 index 00000000000..5bc357c2f13 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/recovery_middleware.go @@ -0,0 +1,164 @@ +package reftable + +import ( + "context" + "errors" + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" + migrationid "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration/id" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/middleware" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +// RecoveryMiddleware ... +type RecoveryMiddleware struct { + registry *protoregistry.Registry + node storage.Node + locator storage.Locator + migrationHandler migrationHandler +} + +// NewRecoveryMiddleware returns a new RecoveryMiddleware. +func NewRecoveryMiddleware( + registry *protoregistry.Registry, + node storage.Node, + locator storage.Locator, + localRepoFactory localrepo.Factory, +) *RecoveryMiddleware { + return &RecoveryMiddleware{ + registry: registry, + node: node, + locator: locator, + migrationHandler: &refBackendMigrator{ + migration.NewReferenceBackendMigration(migrationid.Reftable, git.ReferenceBackendFiles, localRepoFactory, nil), + }, + } +} + +// UnaryServerInterceptor returns a unary interceptor for the middleware. +func (mw *RecoveryMiddleware) UnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if _, ok := storagemgr.NonTransactionalRPCs[info.FullMethod]; ok { + // Non-transactional RPCs do not target repositories, and don't have a target repository + // that could have pending WAL entries. + return handler(ctx, req) + } + + methodInfo, err := mw.registry.LookupMethod(info.FullMethod) + if err != nil { + return nil, fmt.Errorf("lookup method: %w", err) + } + + if err := mw.revertReftable(ctx, methodInfo, req.(proto.Message)); err != nil { + return nil, fmt.Errorf("apply pending WAL: %w", err) + } + + return handler(ctx, req) + } +} + +// StreamServerInterceptor returns a stream interceptor for the middleware. +func (mw *RecoveryMiddleware) StreamServerInterceptor() grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := ss.Context() + + if _, ok := storagemgr.NonTransactionalRPCs[info.FullMethod]; ok { + // Non-transactional RPCs do not target repositories, and don't have a target repository + // that could have pending WAL entries. + return handler(ctx, ss) + } + + methodInfo, err := mw.registry.LookupMethod(info.FullMethod) + if err != nil { + return fmt.Errorf("lookup method: %w", err) + } + + req := methodInfo.NewRequest() + if err := ss.RecvMsg(req); err != nil { + // All of the repository scoped streaming RPCs send the repository in the first message. + // If we fail to read the first message, we'll just let the handler handle it. + return handler(srv, middleware.NewPeekedStream(ss.Context(), nil, err, ss)) + } + + if err := mw.revertReftable(ctx, methodInfo, req); err != nil { + return fmt.Errorf("apply pending WAL: %w", err) + } + + return handler(srv, middleware.NewPeekedStream(ctx, req, nil, ss)) + } +} + +// revertReftable starts a transaction against the target repository's partition and aborts it. If the transaction begins +// successfully, it's guaranteed that all pending WAL entries in the partition have been applied. +func (mw *RecoveryMiddleware) revertReftable(ctx context.Context, methodInfo protoregistry.MethodInfo, req proto.Message) error { + if methodInfo.Scope != protoregistry.ScopeRepository { + // Only repository scoped RPCs may target repositories with pending WAL entries. + return nil + } + + targetRepoProto, err := methodInfo.TargetRepo(req) + if err != nil { + if errors.Is(err, protoregistry.ErrRepositoryFieldNotFound) { + // If the repository field was not set, it can't target a repository that has pending WAL entries + // Let the handler handle the situation. + return nil + } + + return fmt.Errorf("target repo: %w", err) + } + + storageName := targetRepoProto.GetStorageName() + relativePath := targetRepoProto.GetRelativePath() + + storageHandle, err := mw.node.GetStorage(storageName) + if err != nil { + if errors.Is(err, storage.ErrStorageNotFound) { + // This request was for a storage that isn't configured, and wouldn't thus target a repository + // with a pending WAL entry. + return nil + } + + return fmt.Errorf("get storage: %w", err) + } + + path, err := mw.locator.GetRepoPath(ctx, targetRepoProto) + if err != nil { + return fmt.Errorf("get repo path: %w", err) + } + + refBackend, err := gitcmd.DetectReferenceBackend(ctx, path) + if err != nil { + return fmt.Errorf("get reference backend: %w", err) + } + + // It's on the files backend, so we don't have to migrate anything + if refBackend.Name == git.ReferenceBackendFiles.Name { + return nil + } + + tx, err := storageHandle.Begin(ctx, storage.TransactionOptions{ + ReadOnly: false, + RelativePath: relativePath, + }) + if err != nil { + return fmt.Errorf("begin: %w", err) + } + + if err = mw.migrationHandler.Migrate(ctx, tx, storageName, relativePath); err != nil { + return errors.Join(fmt.Errorf("failed to migrate: %w", err), tx.Rollback(ctx)) + } + + if _, err = tx.Commit(ctx); err != nil { + return fmt.Errorf("commit: %w", err) + } + + return nil +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/recovery_middleware_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/recovery_middleware_test.go new file mode 100644 index 00000000000..5df61e1c3e7 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/recovery_middleware_test.go @@ -0,0 +1,153 @@ +package reftable + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +type mockServerStream struct { + grpc.ServerStream + context func() context.Context + recvMsg func(any) error +} + +func (ss mockServerStream) Context() context.Context { + return ss.context() +} + +func (ss mockServerStream) RecvMsg(m any) error { + return ss.recvMsg(m) +} + +func TestReftableRecoveryMiddleware(t *testing.T) { + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.SharedLogger(t) + + dbMgr, err := databasemgr.NewDBManager( + ctx, + cfg.Storages, + keyvalue.NewBadgerStore, + helper.NewTimerTickerFactory(time.Minute), + logger, + ) + require.NoError(t, err) + defer dbMgr.Close() + + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + cmdFactory := gittest.NewCommandFactory(t, cfg) + localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) + + partitionFactoryOptions := []partition.FactoryOption{ + partition.WithCmdFactory(cmdFactory), + partition.WithRepoFactory(localRepoFactory), + partition.WithMetrics(partition.NewMetrics(nil)), + partition.WithRaftConfig(cfg.Raft), + } + + partitionFactory := partition.NewFactory(partitionFactoryOptions...) + + ptnMgr, err := node.NewManager(cfg.Storages, storagemgr.NewFactory( + logger, dbMgr, partitionFactory, config.DefaultMaxInactivePartitions, storagemgr.NewMetrics(cfg.Prometheus), + )) + require.NoError(t, err) + defer ptnMgr.Close() + + mw := NewRecoveryMiddleware(protoregistry.GitalyProtoPreregistered, ptnMgr, config.NewLocator(cfg), localRepoFactory) + + t.Run("unary interceptor", func(t *testing.T) { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("master")) + + tReq := &gitalypb.CreateRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: repoProto.GetStorageName(), + RelativePath: repoProto.GetRelativePath(), + }, + } + tResp := &gitalypb.CreateRepositoryResponse{} + + resp, err := mw.UnaryServerInterceptor()( + ctx, + tReq, + &grpc.UnaryServerInfo{FullMethod: gitalypb.RepositoryService_CreateRepository_FullMethodName}, + func(ctx context.Context, req any) (any, error) { + require.Equal(t, tReq, req) + return tResp, nil + }, + ) + require.NoError(t, err) + require.Equal(t, tResp, resp) + + repo := localrepo.NewTestRepo(t, cfg, repoProto) + backend, err := repo.ReferenceBackend(ctx) + require.NoError(t, err) + require.Equal(t, git.ReferenceBackendFiles.Name, backend.Name) + }) + + t.Run("stream interceptor", func(t *testing.T) { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("master")) + + tReq := &gitalypb.CreateRepositoryFromBundleRequest{ + Repository: &gitalypb.Repository{ + StorageName: repoProto.GetStorageName(), + RelativePath: repoProto.GetRelativePath(), + }, + } + + err := mw.StreamServerInterceptor()( + nil, + mockServerStream{ + context: func() context.Context { return ctx }, + recvMsg: func(m any) error { + marshaled, err := proto.Marshal(tReq) + require.NoError(t, err) + + return proto.Unmarshal(marshaled, m.(proto.Message)) + }, + }, + &grpc.StreamServerInfo{FullMethod: gitalypb.RepositoryService_CreateRepositoryFromBundle_FullMethodName}, + func(srv any, stream grpc.ServerStream) error { + req := proto.Clone(tReq) + proto.Reset(req) + + require.Equal(t, stream.RecvMsg(req), nil) + testhelper.ProtoEqual(t, tReq, req) + return nil + }, + ) + require.NoError(t, err) + + repo := localrepo.NewTestRepo(t, cfg, repoProto) + backend, err := repo.ReferenceBackend(ctx) + require.NoError(t, err) + require.Equal(t, git.ReferenceBackendFiles.Name, backend.Name) + }) +} -- GitLab