From a8396fa9cb9d018e88967f6ae8d4f142be457316 Mon Sep 17 00:00:00 2001 From: Emily Chui Date: Fri, 5 Sep 2025 16:29:15 +1000 Subject: [PATCH 1/4] Move packrefs, repack out of transactions --- internal/git/housekeeping/config/config.go | 68 +++++++++++++++++++ internal/git/housekeeping/manager/manager.go | 8 +++ .../manager/optimize_repository.go | 4 +- internal/git/housekeeping/manager/repack.go | 22 ++++++ internal/gitaly/storage/storage.go | 12 ++-- .../storagemgr/partition/testhelper_test.go | 10 +-- .../partition/transaction_manager.go | 45 ++++++------ 7 files changed, 138 insertions(+), 31 deletions(-) create mode 100644 internal/git/housekeeping/manager/repack.go diff --git a/internal/git/housekeeping/config/config.go b/internal/git/housekeeping/config/config.go index 56db3de82c1..6415bbb109c 100644 --- a/internal/git/housekeeping/config/config.go +++ b/internal/git/housekeeping/config/config.go @@ -2,8 +2,76 @@ package config import ( "time" + + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/reftable" ) +// RunHousekeeping models housekeeping tasks. It is supposed to handle housekeeping tasks for repositories +// such as the cleanup of unneeded files and optimizations for the repository's data structures. +type RunHousekeeping struct { + PackRefs *runPackRefs + Repack *runRepack + WriteCommitGraphs *writeCommitGraphs + RunOffloading *runOffloading + RunRehydrating *runRehydrating +} + +// runPackRefs models refs packing housekeeping task. It packs heads and tags for efficient repository access. +type runPackRefs struct { + // PrunedRefs contain a list of references pruned by the `git-pack-refs` command. They are used + // for comparing to the ref list of the destination repository + PrunedRefs map[git.ReferenceName]struct{} + // emptyDirectories contain a list of empty directories in the transaction snapshot. It is used + // to delete those directories during pack refs' post stage. + emptyDirectories map[string]struct{} + // reftablesBefore contains the data in 'tables.list' before the compaction. This is used to + // compare with the destination repositories 'tables.list'. + reftablesBefore []reftable.Name + // reftablesAfter contains the data in 'tables.list' after the compaction. This is used for + // generating the combined 'tables.list' during verification. + reftablesAfter []reftable.Name +} + +// NewRunPackRefs creates a new runPackRefs instance with initialized maps. +func NewRunPackRefs() *runPackRefs { + return &runPackRefs{ + PrunedRefs: map[git.ReferenceName]struct{}{}, + emptyDirectories: map[string]struct{}{}, + } +} + +// runRepack models repack housekeeping task. We support multiple repacking strategies. At this stage, the outside +// scheduler determines which strategy to use. The transaction manager is responsible for executing it. In the future, +// we want to make housekeeping smarter by migrating housekeeping scheduling responsibility to this manager. That work +// is tracked in https://gitlab.com/gitlab-org/gitaly/-/issues/5709. +type runRepack struct { + // config tells which strategy and baggaged options. + config RepackObjectsConfig +} + +// NewRunRepack creates a new runRepack instance with the given configuration. +func NewRunRepack(config RepackObjectsConfig) *runRepack { + return &runRepack{ + config: config, + } +} + +// writeCommitGraphs models a commit graph update. +type writeCommitGraphs struct { + // config includes the configs for writing commit graph. + config WriteCommitGraphConfig +} + +// runOffloading models offloading tasks. It stores configuration information needed to offload a repository. +type runOffloading struct { + config OffloadingConfig +} + +type runRehydrating struct { + prefix string +} + // WriteCommitGraphConfig contains configuration that can be passed to WriteCommitGraph to alter its // default behaviour. type WriteCommitGraphConfig struct { diff --git a/internal/git/housekeeping/manager/manager.go b/internal/git/housekeeping/manager/manager.go index b08d78d4d37..983bef98995 100644 --- a/internal/git/housekeeping/manager/manager.go +++ b/internal/git/housekeeping/manager/manager.go @@ -28,6 +28,14 @@ type Manager interface { // back to local storage. The prefix parameter specifies the object prefix in the remote storage // where the repository objects are stored. RehydrateRepository(context.Context, *localrepo.Repo, string) error + // Repack runs object repacking housekeeping task when the transaction commits. If this + // is called, the transaction is limited to running only other housekeeping tasks. No other + // updates are allowed. + Repack(config.RepackObjectsConfig, *config.RunHousekeeping) + // PackRefs runs reference repacking housekeeping when the transaction commits. If this + // is called, the transaction is limited to running only other housekeeping tasks. No other + // updates are allowed. + PackRefs(*config.RunHousekeeping) } // repositoryState holds the housekeeping state for individual repositories. This structure can be diff --git a/internal/git/housekeeping/manager/optimize_repository.go b/internal/git/housekeeping/manager/optimize_repository.go index c60906413d2..7d8a1d50b3a 100644 --- a/internal/git/housekeeping/manager/optimize_repository.go +++ b/internal/git/housekeeping/manager/optimize_repository.go @@ -317,7 +317,7 @@ func (m *RepositoryManager) optimizeRepositoryWithTransaction( var errPackReferences error if packRefsNeeded { if err := m.runInTransaction(ctx, "housekeeping/pack-refs", false, repo, func(ctx context.Context, tx storage.Transaction, repo *localrepo.Repo) error { - tx.PackRefs() + m.PackRefs(tx.GetHousekeepingTasks()) return nil }); err != nil { errPackReferences = fmt.Errorf("run reference packing: %w", err) @@ -328,7 +328,7 @@ func (m *RepositoryManager) optimizeRepositoryWithTransaction( if repackNeeded || writeCommitGraphNeeded { if err := m.runInTransaction(ctx, "housekeeping/pack-objects", false, repo, func(ctx context.Context, tx storage.Transaction, repo *localrepo.Repo) error { if repackNeeded { - tx.Repack(repackCfg) + m.Repack(repackCfg, tx.GetHousekeepingTasks()) } if writeCommitGraphNeeded { diff --git a/internal/git/housekeeping/manager/repack.go b/internal/git/housekeeping/manager/repack.go new file mode 100644 index 00000000000..3423c497fd9 --- /dev/null +++ b/internal/git/housekeeping/manager/repack.go @@ -0,0 +1,22 @@ +package manager + +import ( + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" +) + +// Repack sets repacking housekeeping task as a part of the transaction. +func (m *RepositoryManager) Repack(repackConfig config.RepackObjectsConfig, runHousekeeping *config.RunHousekeeping) { + if runHousekeeping == nil { + runHousekeeping = &config.RunHousekeeping{} + } + runHousekeeping.Repack = config.NewRunRepack(repackConfig) +} + +// PackRefs sets pack-refs housekeeping task as a part of the transaction. The transaction can only runs other +// housekeeping tasks in the same transaction. No other updates are allowed. +func (m *RepositoryManager) PackRefs(runHousekeeping *config.RunHousekeeping) { + if runHousekeeping == nil { + runHousekeeping = &config.RunHousekeeping{} + } + runHousekeeping.PackRefs = config.NewRunPackRefs() +} diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 1bda040e519..687b6b946a2 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -114,11 +114,11 @@ type Transaction interface { // PackRefs runs reference repacking housekeeping when the transaction commits. If this // is called, the transaction is limited to running only other housekeeping tasks. No other // updates are allowed. - PackRefs() - // Repack runs object repacking housekeeping task when the transaction commits. If this - // is called, the transaction is limited to running only other housekeeping tasks. No other - // updates are allowed. - Repack(housekeepingcfg.RepackObjectsConfig) + // PackRefs() + // // Repack runs object repacking housekeeping task when the transaction commits. If this + // // is called, the transaction is limited to running only other housekeeping tasks. No other + // // updates are allowed. + // Repack(housekeepingcfg.RepackObjectsConfig) // WriteCommitGraphs rewrites the commit graphs when the transaction commits. If this // is called, the transaction is limited to running only other housekeeping tasks. No other // updates are allowed. @@ -136,6 +136,8 @@ type Transaction interface { // and schedules the rehydrating operation to execute when the transaction commits. // It stores the bucket prefix in the transaction's runRehydrating struct. SetRehydratingConfig(string) + // GetHousekeepingTasks + GetHousekeepingTasks() *housekeepingcfg.RunHousekeeping } // BeginOptions are used to configure a transaction that is being started. diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 902d0a639f9..a5d2bd3f81e 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -29,6 +29,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" housekeepingcfg "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" + housekeepingmgr "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/manager" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/objectpool" "gitlab.com/gitlab-org/gitaly/v16/internal/git/packfile" @@ -1214,7 +1215,6 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas newMetrics := func() Metrics { return NewMetrics(housekeeping.NewMetrics(setup.Config.Prometheus)) } - var raftFactory raftmgr.RaftReplicaFactory var raftEntryRecorder *raftmgr.ReplicaEntryRecorder clusterID := uuid.New().String() @@ -1616,14 +1616,14 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas )) case RunPackRefs: require.Contains(t, openTransactions, step.TransactionID, "test error: pack-refs housekeeping task aborted on committed before beginning it") - + housekeeper := housekeepingmgr.New(setup.Config.Prometheus, logger, nil, nil) transaction := openTransactions[step.TransactionID] - transaction.PackRefs() + housekeeper.PackRefs(transaction.GetHousekeepingTasks()) case RunRepack: require.Contains(t, openTransactions, step.TransactionID, "test error: repack housekeeping task aborted on committed before beginning it") - + housekeeper := housekeepingmgr.New(setup.Config.Prometheus, logger, nil, nil) transaction := openTransactions[step.TransactionID] - transaction.Repack(step.Config) + housekeeper.Repack(step.Config, transaction.GetHousekeepingTasks()) case WriteCommitGraphs: require.Contains(t, openTransactions, step.TransactionID, "test error: repack housekeeping task aborted on committed before beginning it") diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 59df0bae8d4..a108e6fb98c 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -198,7 +198,7 @@ type Transaction struct { referenceUpdates []git.ReferenceUpdates repositoryCreation *repositoryCreation deleteRepository bool - runHousekeeping *runHousekeeping + runHousekeeping *housekeepingcfg.RunHousekeeping // objectDependencies are the object IDs this transaction depends on in // the repository. The dependencies are used to guard against invalid packs @@ -744,24 +744,31 @@ func (txn *Transaction) DeleteRepository() { // PackRefs sets pack-refs housekeeping task as a part of the transaction. The transaction can only runs other // housekeeping tasks in the same transaction. No other updates are allowed. -func (txn *Transaction) PackRefs() { - if txn.runHousekeeping == nil { - txn.runHousekeeping = &runHousekeeping{} - } - txn.runHousekeeping.packRefs = &runPackRefs{ - PrunedRefs: map[git.ReferenceName]struct{}{}, - emptyDirectories: map[string]struct{}{}, - } -} +// func (txn *Transaction) PackRefs() { +// if txn.runHousekeeping == nil { +// txn.runHousekeeping = &runHousekeeping{} +// } +// txn.runHousekeeping.packRefs = &runPackRefs{ +// PrunedRefs: map[git.ReferenceName]struct{}{}, +// emptyDirectories: map[string]struct{}{}, +// } +// } + +// // Repack sets repacking housekeeping task as a part of the transaction. +// +// func (txn *Transaction) Repack(config housekeepingcfg.RepackObjectsConfig) { +// if txn.runHousekeeping == nil { +// txn.runHousekeeping = &runHousekeeping{} +// } +// txn.runHousekeeping.repack = &runRepack{ +// config: config, +// } +// } +// -// Repack sets repacking housekeeping task as a part of the transaction. -func (txn *Transaction) Repack(config housekeepingcfg.RepackObjectsConfig) { - if txn.runHousekeeping == nil { - txn.runHousekeeping = &runHousekeeping{} - } - txn.runHousekeeping.repack = &runRepack{ - config: config, - } +// GetHousekeepingTasks returns the transactions housekeeping tasks +func (txn *Transaction) GetHousekeepingTasks() *housekeepingcfg.RunHousekeeping { + return txn.runHousekeeping } // WriteCommitGraphs enables the commit graph to be rewritten as part of the transaction. @@ -1561,7 +1568,7 @@ func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, tran // The work of adding pruned refs dump to `git-pack-refs` is tracked here: // https://gitlab.com/gitlab-org/git/-/issues/222 func (mgr *TransactionManager) preparePackRefsFiles(ctx context.Context, transaction *Transaction) error { - runPackRefs := transaction.runHousekeeping.packRefs + runPackRefs := transaction.GetHousekeepingTasks().PackRefs for _, lock := range []string{".new", ".lock"} { lockRelativePath := filepath.Join(transaction.relativePath, "packed-refs"+lock) lockAbsolutePath := filepath.Join(transaction.snapshot.Root(), lockRelativePath) -- GitLab From 651ef852ed5fd829d9d9ecb2eb2a8cdf4bde20bb Mon Sep 17 00:00:00 2001 From: Emily Chui Date: Tue, 23 Sep 2025 17:03:10 +1000 Subject: [PATCH 2/4] Expose runRepack Config as public --- internal/git/housekeeping/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/git/housekeeping/config/config.go b/internal/git/housekeeping/config/config.go index 6415bbb109c..28a9dbcbcae 100644 --- a/internal/git/housekeeping/config/config.go +++ b/internal/git/housekeeping/config/config.go @@ -47,7 +47,7 @@ func NewRunPackRefs() *runPackRefs { // is tracked in https://gitlab.com/gitlab-org/gitaly/-/issues/5709. type runRepack struct { // config tells which strategy and baggaged options. - config RepackObjectsConfig + Config RepackObjectsConfig } // NewRunRepack creates a new runRepack instance with the given configuration. -- GitLab From 33eb2ca046ffb9717c78c4af180533fee14ac473 Mon Sep 17 00:00:00 2001 From: Emily Chui Date: Tue, 23 Sep 2025 17:11:17 +1000 Subject: [PATCH 3/4] refactor getAbsolutePath to take in param --- .../partition/transaction_manager.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index a108e6fb98c..f10b28ac3e6 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -1303,7 +1303,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra return nil } - if _, err := os.Stat(mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath())); err != nil { + if _, err := os.Stat(getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath(), mgr.storagePath)); err != nil { if !errors.Is(err, fs.ErrNotExist) { return fmt.Errorf("stat: %w", err) } @@ -1491,7 +1491,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra // why we don't track 'tables.list' operation here. func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, transaction *Transaction) error { runPackRefs := transaction.runHousekeeping.packRefs - repoPath := mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath()) + repoPath := getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath(), mgr.storagePath) if err := allowReftableCompaction(repoPath); err != nil { return fmt.Errorf("allow reftable compaction: %w", err) @@ -1567,7 +1567,7 @@ func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, tran // Smaller repositories or ones that run housekeeping frequent won't have this issue. // The work of adding pruned refs dump to `git-pack-refs` is tracked here: // https://gitlab.com/gitlab-org/git/-/issues/222 -func (mgr *TransactionManager) preparePackRefsFiles(ctx context.Context, transaction *Transaction) error { +func (mgr *TransactionManager) preparePackRefsFiles(ctx context.Context, transaction *Transaction, storagePath string) error { runPackRefs := transaction.GetHousekeepingTasks().PackRefs for _, lock := range []string{".new", ".lock"} { lockRelativePath := filepath.Join(transaction.relativePath, "packed-refs"+lock) @@ -1587,7 +1587,8 @@ func (mgr *TransactionManager) preparePackRefsFiles(ctx context.Context, transac // First walk to collect the list of loose refs. looseReferences := make(map[git.ReferenceName]struct{}) - repoPath := mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath()) + repoPath := getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath(), mgr.storagePath) + if err := filepath.WalkDir(filepath.Join(repoPath, "refs"), func(path string, entry fs.DirEntry, err error) error { if err != nil { return err @@ -1799,7 +1800,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned if refBackend == git.ReferenceBackendReftables || transaction.runHousekeeping != nil { if refBackend == git.ReferenceBackendReftables { if err := transaction.reftableRecorder.stageTables(ctx, - mgr.getAbsolutePath(ctx, transaction.relativePath), + getAbsolutePath(ctx, transaction.relativePath, mgr.storagePath), transaction, ); err != nil { return commitResult{error: fmt.Errorf("stage tables: %w", err)} @@ -1807,7 +1808,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned } if transaction.runHousekeeping != nil { - housekeepingEntry, err := mgr.verifyHousekeeping(ctx, transaction, refBackend, objectHash.ZeroOID) + housekeepingEntry, err := verifyHousekeeping(ctx, transaction, mgr, refBackend, objectHash.ZeroOID) if err != nil { return commitResult{error: fmt.Errorf("verifying pack refs: %w", err)} } @@ -2101,7 +2102,7 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relativePath string, opts ...storage.GetRepoPathOption) (bool, error) { defer trace.StartRegion(ctx, "doesRepositoryExist").End() - stat, err := os.Stat(mgr.getAbsolutePath(ctx, relativePath, opts...)) + stat, err := os.Stat(getAbsolutePath(ctx, relativePath, mgr.storagePath, opts...)) if err != nil { if errors.Is(err, fs.ErrNotExist) { return false, nil @@ -2120,7 +2121,7 @@ func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relative // getAbsolutePath returns the absolute path of a relative path within the storage. // If the GetRepoPathOption UseRootStorage is set, it returns the repository’s original // path in the root storage (i.e., the storage defined in the Gitaly config). -func (mgr *TransactionManager) getAbsolutePath(ctx context.Context, relativePath string, opts ...storage.GetRepoPathOption) string { +func getAbsolutePath(ctx context.Context, relativePath string, storagePath string, opts ...storage.GetRepoPathOption) string { var cfg storage.GetRepoPathConfig for _, opt := range opts { opt(&cfg) @@ -2130,7 +2131,7 @@ func (mgr *TransactionManager) getAbsolutePath(ctx context.Context, relativePath return filepath.Join(txn.FS().Root(), relativePath) } - return filepath.Join(mgr.storagePath, relativePath) + return filepath.Join(storagePath, relativePath) } // packFilePath returns a log entry's pack file's absolute path in the wal files directory. -- GitLab From 02beaf7cdbbc620a0defe100ec621ebe49fa3f0c Mon Sep 17 00:00:00 2001 From: Emily Chui Date: Thu, 25 Sep 2025 15:32:02 +1000 Subject: [PATCH 4/4] move verifyHouseKeeping to housekeeping pkg --- .../manager/optimize_repository.go | 60 +++++ internal/git/housekeeping/transaction.go | 245 ++++++++++++++++++ .../transaction_manager_housekeeping.go | 136 +++++----- 3 files changed, 373 insertions(+), 68 deletions(-) create mode 100644 internal/git/housekeeping/transaction.go diff --git a/internal/git/housekeeping/manager/optimize_repository.go b/internal/git/housekeeping/manager/optimize_repository.go index 7d8a1d50b3a..6db40af6b14 100644 --- a/internal/git/housekeeping/manager/optimize_repository.go +++ b/internal/git/housekeeping/manager/optimize_repository.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "time" + "runtime/trace" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" @@ -15,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/tracing" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -520,3 +522,61 @@ func (m *RepositoryManager) CleanStaleData(ctx context.Context, repo *localrepo. return nil } + +// verifyHousekeeping verifies if all included housekeeping tasks can be performed. Although it's feasible for multiple +// housekeeping tasks running at the same time, it's not guaranteed they are conflict-free. So, we need to ensure there +// is no other concurrent housekeeping task. Each sub-task also needs specific verification. +func (m *RepositoryManager) verifyHousekeeping(ctx context.Context, transaction *partition.Transaction, mgr *partition.TransactionManager, refBackend git.ReferenceBackend, zeroOID git.ObjectID) (*gitalypb.LogEntry_Housekeeping, error) { + defer trace.StartRegion(ctx, "verifyHousekeeping").End() + + span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.verifyHousekeeping", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) + defer span.Finish() + + finishTimer := m.metrics.ReportTaskLatency("total", "verify") + defer finishTimer() + + // Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest appended LSN. + if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry, objectDependencies map[git.ObjectID]struct{}) error { + if entry.GetHousekeeping() != nil { + return errHousekeepingConflictConcurrent + } + if entry.GetRepositoryDeletion() != nil { + return errConflictRepositoryDeletion + } + + // Applying a repacking operation prunes all loose objects on application. If loose objects were concurrently introduced + // in the repository with the repacking operation, this could lead to corruption if we prune a loose object that is needed. + // Transactions in general only introduce packs, not loose objects. The only exception to this currently is alternate + // unlinking operations where the objects of the alternate are hard linked into the member repository. This can technically + // still introduce loose objects into the repository and trigger this problem as the pools could still have loose objects + // in them until the first repack. + // + // Check if the repository was unlinked from an alternate concurrently. + for _, op := range entry.GetOperations() { + switch op := op.GetOperation().(type) { + case *gitalypb.LogEntry_Operation_RemoveDirectoryEntry_: + if string(op.RemoveDirectoryEntry.GetPath()) == stats.AlternatesFilePath(transaction.relativePath) { + return errConcurrentAlternateUnlink + } + } + } + + return nil + }); err != nil { + return nil, fmt.Errorf("walking committed entries: %w", err) + } + + packRefsEntry, err := mgr.verifyPackRefs(ctx, transaction, refBackend, zeroOID) + if err != nil { + return nil, fmt.Errorf("verifying pack refs: %w", err) + } + + if err := mgr.verifyRepacking(ctx, transaction); err != nil { + return nil, fmt.Errorf("verifying repacking: %w", err) + } + + return &gitalypb.LogEntry_Housekeeping{ + PackRefs: packRefsEntry, + }, nil +} \ No newline at end of file diff --git a/internal/git/housekeeping/transaction.go b/internal/git/housekeeping/transaction.go new file mode 100644 index 00000000000..26d2507db3d --- /dev/null +++ b/internal/git/housekeeping/transaction.go @@ -0,0 +1,245 @@ +package housekeeping + +import ( + "context" + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/tracing" +) + +// TransactionContext provides the necessary context for transaction-aware housekeeping operations. +type TransactionContext interface { + // SnapshotRepository returns the repository to operate on (snapshot in transaction context) + SnapshotRepository() *localrepo.Repo + // RecordFileChange records a file change in the transaction's WAL + RecordFileChange(absolutePath, relativePath string) error + // RecordFileRemoval records a file removal in the transaction's WAL + RecordFileRemoval(relativePath string) error + // RecordDirectoryChange records a directory change in the transaction's WAL + RecordDirectoryChange(relativePath string) error + // RecordDirectoryRemoval records a directory removal in the transaction's WAL + RecordDirectoryRemoval(relativePath string) error + // GetRelativePath returns the repository's relative path + GetRelativePath() string + // GetSnapshotRoot returns the snapshot root directory + GetSnapshotRoot() string +} + +// TransactionHousekeepingResult contains the results of transaction-aware housekeeping operations. +type TransactionHousekeepingResult struct { + PackRefsResult *PackRefsResult + RepackResult *RepackResult + CommitGraphResult *CommitGraphResult + OffloadingResult *OffloadingResult + RehydratingResult *RehydratingResult +} + +// PackRefsResult contains the results of pack-refs operation. +type PackRefsResult struct { + PrunedRefs map[git.ReferenceName]struct{} + EmptyDirectories map[string]struct{} + ReftablesBefore []string + ReftablesAfter []string +} + +// RepackResult contains the results of repack operation. +type RepackResult struct { + // Implementation details for repack results +} + +// CommitGraphResult contains the results of commit-graph operation. +type CommitGraphResult struct { + // Implementation details for commit-graph results +} + +// OffloadingResult contains the results of offloading operation. +type OffloadingResult struct { + UploadedFiles []string + Prefix string +} + +// RehydratingResult contains the results of rehydrating operation. +type RehydratingResult struct { + DownloadedFiles []string +} + +// PrepareTransactionHousekeeping performs all housekeeping operations for a transaction. +// This is the main entry point that replaces the transaction manager's prepareHousekeeping. +func PrepareTransactionHousekeeping( + ctx context.Context, + txnCtx TransactionContext, + tasks *config.RunHousekeeping, + metrics *Metrics, +) (*TransactionHousekeepingResult, error) { + if tasks == nil { + return nil, nil + } + + span, ctx := tracing.StartSpanIfHasParent(ctx, "housekeeping.PrepareTransactionHousekeeping", nil) + defer span.Finish() + + finishTimer := metrics.ReportTaskLatency("total", "prepare") + defer finishTimer() + + result := &TransactionHousekeepingResult{} + + // Prepare pack-refs + if tasks.PackRefs != nil { + packRefsResult, err := prepareTransactionPackRefs(ctx, txnCtx, tasks.PackRefs, metrics) + if err != nil { + return nil, fmt.Errorf("preparing pack-refs: %w", err) + } + result.PackRefsResult = packRefsResult + } + + // Prepare repacking + if tasks.Repack != nil { + repackResult, err := prepareTransactionRepack(ctx, txnCtx, tasks.Repack, metrics) + if err != nil { + return nil, fmt.Errorf("preparing repack: %w", err) + } + result.RepackResult = repackResult + } + + // Prepare commit graphs + if tasks.WriteCommitGraphs != nil { + commitGraphResult, err := prepareTransactionCommitGraphs(ctx, txnCtx, tasks.WriteCommitGraphs, metrics) + if err != nil { + return nil, fmt.Errorf("preparing commit graphs: %w", err) + } + result.CommitGraphResult = commitGraphResult + } + + // Prepare offloading + if tasks.Offloading != nil { + offloadingResult, err := prepareTransactionOffloading(ctx, txnCtx, tasks.Offloading, metrics) + if err != nil { + return nil, fmt.Errorf("preparing offloading: %w", err) + } + result.OffloadingResult = offloadingResult + } + + // Prepare rehydrating + if tasks.Rehydrating != nil { + rehydratingResult, err := prepareTransactionRehydrating(ctx, txnCtx, tasks.Rehydrating, metrics) + if err != nil { + return nil, fmt.Errorf("preparing rehydrating: %w", err) + } + result.RehydratingResult = rehydratingResult + } + + return result, nil +} + +// prepareTransactionPackRefs handles pack-refs preparation for transactions. +func prepareTransactionPackRefs( + ctx context.Context, + txnCtx TransactionContext, + packRefsConfig *config.RunPackRefs, + metrics *Metrics, +) (*PackRefsResult, error) { + span, ctx := tracing.StartSpanIfHasParent(ctx, "housekeeping.prepareTransactionPackRefs", nil) + defer span.Finish() + + finishTimer := metrics.ReportTaskLatency("pack-refs", "prepare") + defer finishTimer() + + repo := txnCtx.SnapshotRepository() + refBackend, err := repo.ReferenceBackend(ctx) + if err != nil { + return nil, fmt.Errorf("reference backend: %w", err) + } + + if refBackend == git.ReferenceBackendReftables { + return prepareTransactionPackRefsReftable(ctx, txnCtx, packRefsConfig, metrics) + } + + return prepareTransactionPackRefsFiles(ctx, txnCtx, packRefsConfig, metrics) +} + +// prepareTransactionPackRefsReftable handles reftable-specific pack-refs preparation. +func prepareTransactionPackRefsReftable( + ctx context.Context, + txnCtx TransactionContext, + packRefsConfig *config.RunPackRefs, + metrics *Metrics, +) (*PackRefsResult, error) { + // Move the reftable-specific logic from transaction manager here + // This would include the reftable compaction logic + return &PackRefsResult{}, nil +} + +// prepareTransactionPackRefsFiles handles files-backend-specific pack-refs preparation. +func prepareTransactionPackRefsFiles( + ctx context.Context, + txnCtx TransactionContext, + packRefsConfig *config.RunPackRefs, + metrics *Metrics, +) (*PackRefsResult, error) { + // Move the files-backend-specific logic from transaction manager here + // This would include the git pack-refs command execution and result processing + return &PackRefsResult{}, nil +} + +// prepareTransactionRepack handles repack preparation for transactions. +func prepareTransactionRepack( + ctx context.Context, +txnCtx TransactionContext, + repackConfig *config.RunRepack, + metrics *Metrics, +) (*RepackResult, error) { + span, ctx := tracing.StartSpanIfHasParent(ctx, "housekeeping.prepareTransactionRepack", nil) + defer span.Finish() + + finishTimer := metrics.ReportTaskLatency("repack", "prepare") + defer finishTimer() + + // Move the repacking logic from transaction manager here + // This includes strategy-specific repacking, packfile collection, and WAL recording + return &RepackResult{}, nil +} + +// prepareTransactionCommitGraphs handles commit-graph preparation for transactions. +func prepareTransactionCommitGraphs( + ctx context.Context, + txnCtx TransactionContext, + commitGraphConfig *config.WriteCommitGraphs, + metrics *Metrics, +) (*CommitGraphResult, error) { + span, ctx := tracing.StartSpanIfHasParent(ctx, "housekeeping.prepareTransactionCommitGraphs", nil) + defer span.Finish() + + finishTimer := metrics.ReportTaskLatency("commit-graph", "prepare") + defer finishTimer() + + // Move the commit-graph logic from transaction manager here + // This includes commit-graph writing and WAL recording + return &CommitGraphResult{}, nil +} + +// prepareTransactionOffloading handles offloading preparation for transactions. +func prepareTransactionOffloading( + ctx context.Context, + txnCtx TransactionContext, + offloadingConfig *config.Offloading, + metrics *Metrics, +) (*OffloadingResult, error) { + // Move the offloading logic from transaction manager here + // This includes repacking for offloading, uploading, and config updates + return &OffloadingResult{}, nil +} + +// prepareTransactionRehydrating handles rehydrating preparation for transactions. +func prepareTransactionRehydrating( + ctx context.Context, + txnCtx TransactionContext, + rehydratingConfig *config.Rehydrating, + metrics *Metrics, +) (*RehydratingResult, error) { + // Move the rehydrating logic from transaction manager here + // This includes downloading packfiles and config updates + return &RehydratingResult{}, nil +} diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go index 4c2d0f4e8fa..f4feafa690d 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go @@ -173,7 +173,7 @@ func (mgr *TransactionManager) preparePackRefs(ctx context.Context, transaction return nil } - if err = mgr.preparePackRefsFiles(ctx, transaction); err != nil { + if err = mgr.preparePackRefsFiles(ctx, transaction, mgr.storagePath); err != nil { return fmt.Errorf("files backend: %w", err) } return nil @@ -200,7 +200,7 @@ var regexpLooseObjectDir = regexp.MustCompile("^[[:xdigit:]]{2}$") func (mgr *TransactionManager) prepareRepacking(ctx context.Context, transaction *Transaction) error { defer trace.StartRegion(ctx, "prepareRepacking").End() - if transaction.runHousekeeping.repack == nil { + if transaction.GetHousekeepingTasks().Repack == nil { return nil } @@ -211,19 +211,19 @@ func (mgr *TransactionManager) prepareRepacking(ctx context.Context, transaction defer finishTimer() var err error - repack := transaction.runHousekeeping.repack + repack := transaction.GetHousekeepingTasks().Repack // Build a working repository pointing to snapshot repository. Housekeeping task can access the repository // without the needs for quarantine. workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) - repoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) + repoPath := getAbsolutePath(ctx, workingRepository.GetRelativePath(), mgr.storagePath) - isFullRepack, err := housekeeping.ValidateRepacking(repack.config) + isFullRepack, err := housekeeping.ValidateRepacking(repack.Config) if err != nil { return fmt.Errorf("validating repacking: %w", err) } - if repack.config.Strategy == housekeepingcfg.RepackObjectsStrategyIncrementalWithUnreachable { + if repack.Config.Strategy == housekeepingcfg.RepackObjectsStrategyIncrementalWithUnreachable { // Once the transaction manager has been applied and at least one complete repack has occurred, there // should be no loose unreachable objects remaining in the repository. When the transaction manager // processes a change, it consolidates all unreachable objects and objects about to become reachable @@ -465,63 +465,63 @@ func (mgr *TransactionManager) collectPackFiles(ctx context.Context, repoPath st return collectedFiles, nil } -// verifyHousekeeping verifies if all included housekeeping tasks can be performed. Although it's feasible for multiple -// housekeeping tasks running at the same time, it's not guaranteed they are conflict-free. So, we need to ensure there -// is no other concurrent housekeeping task. Each sub-task also needs specific verification. -func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transaction *Transaction, refBackend git.ReferenceBackend, zeroOID git.ObjectID) (*gitalypb.LogEntry_Housekeeping, error) { - defer trace.StartRegion(ctx, "verifyHousekeeping").End() - - span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.verifyHousekeeping", nil) - ctx = storage.ContextWithTransaction(ctx, transaction) - defer span.Finish() - - finishTimer := mgr.metrics.housekeeping.ReportTaskLatency("total", "verify") - defer finishTimer() - - // Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest appended LSN. - if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry, objectDependencies map[git.ObjectID]struct{}) error { - if entry.GetHousekeeping() != nil { - return errHousekeepingConflictConcurrent - } - if entry.GetRepositoryDeletion() != nil { - return errConflictRepositoryDeletion - } - - // Applying a repacking operation prunes all loose objects on application. If loose objects were concurrently introduced - // in the repository with the repacking operation, this could lead to corruption if we prune a loose object that is needed. - // Transactions in general only introduce packs, not loose objects. The only exception to this currently is alternate - // unlinking operations where the objects of the alternate are hard linked into the member repository. This can technically - // still introduce loose objects into the repository and trigger this problem as the pools could still have loose objects - // in them until the first repack. - // - // Check if the repository was unlinked from an alternate concurrently. - for _, op := range entry.GetOperations() { - switch op := op.GetOperation().(type) { - case *gitalypb.LogEntry_Operation_RemoveDirectoryEntry_: - if string(op.RemoveDirectoryEntry.GetPath()) == stats.AlternatesFilePath(transaction.relativePath) { - return errConcurrentAlternateUnlink - } - } - } - - return nil - }); err != nil { - return nil, fmt.Errorf("walking committed entries: %w", err) - } - - packRefsEntry, err := mgr.verifyPackRefs(ctx, transaction, refBackend, zeroOID) - if err != nil { - return nil, fmt.Errorf("verifying pack refs: %w", err) - } - - if err := mgr.verifyRepacking(ctx, transaction); err != nil { - return nil, fmt.Errorf("verifying repacking: %w", err) - } - - return &gitalypb.LogEntry_Housekeeping{ - PackRefs: packRefsEntry, - }, nil -} +// // verifyHousekeeping verifies if all included housekeeping tasks can be performed. Although it's feasible for multiple +// // housekeeping tasks running at the same time, it's not guaranteed they are conflict-free. So, we need to ensure there +// // is no other concurrent housekeeping task. Each sub-task also needs specific verification. +// func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transaction *Transaction, refBackend git.ReferenceBackend, zeroOID git.ObjectID) (*gitalypb.LogEntry_Housekeeping, error) { +// defer trace.StartRegion(ctx, "verifyHousekeeping").End() + +// span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.verifyHousekeeping", nil) +// ctx = storage.ContextWithTransaction(ctx, transaction) +// defer span.Finish() + +// finishTimer := mgr.metrics.housekeeping.ReportTaskLatency("total", "verify") +// defer finishTimer() + +// // Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest appended LSN. +// if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry, objectDependencies map[git.ObjectID]struct{}) error { +// if entry.GetHousekeeping() != nil { +// return errHousekeepingConflictConcurrent +// } +// if entry.GetRepositoryDeletion() != nil { +// return errConflictRepositoryDeletion +// } + +// // Applying a repacking operation prunes all loose objects on application. If loose objects were concurrently introduced +// // in the repository with the repacking operation, this could lead to corruption if we prune a loose object that is needed. +// // Transactions in general only introduce packs, not loose objects. The only exception to this currently is alternate +// // unlinking operations where the objects of the alternate are hard linked into the member repository. This can technically +// // still introduce loose objects into the repository and trigger this problem as the pools could still have loose objects +// // in them until the first repack. +// // +// // Check if the repository was unlinked from an alternate concurrently. +// for _, op := range entry.GetOperations() { +// switch op := op.GetOperation().(type) { +// case *gitalypb.LogEntry_Operation_RemoveDirectoryEntry_: +// if string(op.RemoveDirectoryEntry.GetPath()) == stats.AlternatesFilePath(transaction.relativePath) { +// return errConcurrentAlternateUnlink +// } +// } +// } + +// return nil +// }); err != nil { +// return nil, fmt.Errorf("walking committed entries: %w", err) +// } + +// packRefsEntry, err := mgr.verifyPackRefs(ctx, transaction, refBackend, zeroOID) +// if err != nil { +// return nil, fmt.Errorf("verifying pack refs: %w", err) +// } + +// if err := mgr.verifyRepacking(ctx, transaction); err != nil { +// return nil, fmt.Errorf("verifying repacking: %w", err) +// } + +// return &gitalypb.LogEntry_Housekeeping{ +// PackRefs: packRefsEntry, +// }, nil +// } // verifyPackRefs verifies if the git-pack-refs(1) can be applied without any conflicts. // It calls the reference backend specific function to handle the core logic. @@ -655,13 +655,13 @@ func (mgr *TransactionManager) verifyPackRefsReftable(ctx context.Context, trans // repository before the compaction. However, concurrent writes might have occurred which // wrote new tables to the target repository. We shouldn't loose that data. So we merge // the compacted tables.list with the newer tables from the target repository's tables.list. - repoPath := mgr.getAbsolutePath(ctx, transaction.relativePath, storage.WithRootStorage()) + repoPath := getAbsolutePath(ctx, transaction.relativePath, storage.WithRootStorage()) newTableList, err := reftable.ReadTablesList(repoPath) if err != nil { return nil, fmt.Errorf("reading tables.list: %w", err) } - snapshotRepoPath := mgr.getAbsolutePath(storage.ContextWithTransaction(context.Background(), transaction), transaction.snapshotRepository.GetRelativePath()) + snapshotRepoPath := getAbsolutePath(storage.ContextWithTransaction(context.Background(), transaction), transaction.snapshotRepository.GetRelativePath(), mgr.storagePath) // tables.list is hard-linked from the repository to the snapshot, we shouldn't // directly write to it as we'd modify the original. So let's remove the // hard-linked file. @@ -789,7 +789,7 @@ func (mgr *TransactionManager) verifyPackRefsFiles(ctx context.Context, transact if isDir { // If this is a directory, we need to ensure it is actually empty before removing // it. Check if we find any directory entries we haven't yet deleted. - entries, err := os.ReadDir(mgr.getAbsolutePath(ctx, relativePath, storage.WithRootStorage())) + entries, err := os.ReadDir(getAbsolutePath(ctx, relativePath, mgr.storagePath, storage.WithRootStorage())) if err != nil { return fmt.Errorf("read dir: %w", err) } @@ -847,7 +847,7 @@ func (mgr *TransactionManager) prepareOffloading(ctx context.Context, transactio workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) // workingRepoPath is the current repository path which we are performing operations on. // In the context of transaction, workingRepoPath is a snapshot repository. - workingRepoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) + workingRepoPath := getAbsolutePath(ctx, workingRepository.GetRelativePath(), mgr.storagePath) // cfg.Prefix should be empty in production, which triggers automatic UUID generation. // Non-empty prefix values are only used in test environments. @@ -975,7 +975,7 @@ func (mgr *TransactionManager) prepareRehydrating(ctx context.Context, transacti workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) // workingRepoPath is the current repository path which we are performing operations on. // In the context of transaction, workingRepoPath is a snapshot repository. - workingRepoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) + workingRepoPath := getAbsolutePath(ctx, workingRepository.GetRelativePath()) prefix := transaction.runHousekeeping.runRehydrating.prefix packFilesToDownload, err := mgr.offloadingSink.List(ctx, prefix) -- GitLab