From 4832bb5f40d98205e84dad0ad05554bcd5dafb9e Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Mon, 18 Aug 2025 10:55:14 -0300 Subject: [PATCH] draft: Change locator and rewriterepo draft: Add storage.ContextWithTransaction(ctx, transaction) to test helper generateCommonTests draft: I am debuging run repacking that spans through multiple transactions Fxing staging repo in verify repacking, with a stagingCtx TestTransactionManager is working fix cleantest tmpdir fix /command_factory_cgroup_test fix localrepo test fix migration test, add txn to migration ctx fix locator test add stagectx fix post, pre receive hook fix read only snapshot, exclude +gitaly dir eable all transaction tests fix test on sanpshot fix snapshot test fix reference transaciton hook by extrac tx from txRegitstry fix transaction hook fix leftover migration test fix catfile cache key fix reftable backend migration fix middleware test fix /middleware_test.go repouitl remove use original storage path print why dir is not empty apply operation remve error trying to find why remove repo failed fix removeRepository error when leftover_file_migration fix leftover_file_migration_tes on remove dir fix leftover_file_migration_test revert repoUtil on removing repo fix delete error in transaction manager fix letover file migration fix letover migration test --- internal/cache/walker.go | 2 +- internal/git/catfile/cache.go | 9 +- internal/git/gitcmd/command_factory.go | 6 +- .../git/gitcmd/command_factory_cgroup_test.go | 16 +- internal/git/gitcmd/reference.go | 3 +- internal/git/gittest/repo.go | 8 +- internal/git/localrepo/paths.go | 2 +- internal/git/localrepo/repo.go | 4 +- internal/git/localrepo/repo_test.go | 4 +- internal/git/objectpool/pool.go | 2 +- internal/gitaly/config/locator.go | 15 +- internal/gitaly/config/locator_test.go | 6 +- internal/gitaly/hook/custom.go | 14 +- internal/gitaly/hook/postreceive.go | 4 +- internal/gitaly/hook/prereceive.go | 2 +- internal/gitaly/hook/referencetransaction.go | 3 + internal/gitaly/hook/update.go | 2 +- internal/gitaly/linguist/language_stats.go | 2 +- internal/gitaly/repoutil/custom_hooks.go | 151 ++++++++++ internal/gitaly/repoutil/remove.go | 3 +- .../service/hook/reference_transaction.go | 23 +- internal/gitaly/storage/context.go | 5 + internal/gitaly/storage/locator.go | 2 +- .../storage/storagemgr/middleware_test.go | 20 +- .../storagemgr/partition/apply_operations.go | 12 + .../migration/leftover_file_migration.go | 2 +- .../migration/leftover_file_migration_test.go | 27 +- .../storagemgr/partition/migration/manager.go | 2 + .../migration/xxxx_ref_backend_migration.go | 3 + .../partition/snapshot/manager_test.go | 8 +- .../storagemgr/partition/snapshot/snapshot.go | 36 ++- .../snapshot/snapshot_filter_test.go | 2 +- .../storagemgr/partition/testhelper_test.go | 24 +- .../partition/transaction_manager.go | 263 ++++++++++++++++-- .../transaction_manager_alternate_test.go | 6 +- .../transaction_manager_housekeeping.go | 27 +- .../transaction_manager_repo_test.go | 5 +- .../partition/transaction_manager_test.go | 9 +- internal/tempdir/clean.go | 2 +- internal/tempdir/clean_test.go | 15 +- internal/tempdir/tempdir.go | 2 +- 41 files changed, 624 insertions(+), 129 deletions(-) diff --git a/internal/cache/walker.go b/internal/cache/walker.go index 9444540c438..3dff2aaa7f2 100644 --- a/internal/cache/walker.go +++ b/internal/cache/walker.go @@ -149,7 +149,7 @@ func (c *DiskCache) moveAndClear(storage config.Storage) error { logger := c.logger.WithField("storage", storage.Name) logger.Info("clearing disk cache object folder") - tempPath, err := c.locator.TempDir(storage.Name) + tempPath, err := c.locator.TempDir(nil, storage.Name) if err != nil { return fmt.Errorf("temp dir: %w", err) } diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index 59d57a5eead..eb83e429256 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -241,7 +241,14 @@ func (c *ProcessCache) getOrCreateProcess( span, ctx := tracing.StartSpanIfHasParent(ctx, spanName, nil) defer span.Finish() - cacheKey, isCacheable := newCacheKey(fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())), repo) + //cacheKey, isCacheable := newCacheKey(fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())), repo) + // TODO make this better, hash the txn.FS().Root()? + sessionID := fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())) + txn := storage.ExtractTransaction(ctx) + if txn != nil { + sessionID = fmt.Sprintf("%d+%s", roundToNearestFiveMinute(time.Now()), txn.FS().Root()) + } + cacheKey, isCacheable := newCacheKey(sessionID, repo) if isCacheable { // We only try to look up cached processes in case it is cacheable, which requires a diff --git a/internal/git/gitcmd/command_factory.go b/internal/git/gitcmd/command_factory.go index a31243ef4cd..459fd4f2d62 100644 --- a/internal/git/gitcmd/command_factory.go +++ b/internal/git/gitcmd/command_factory.go @@ -512,9 +512,9 @@ func (cf *ExecCommandFactory) newCommand(ctx context.Context, repo storage.Repos var cgroupsAddCommandOpts []cgroups.AddCommandOption if repo != nil { relativePath := repo.GetRelativePath() - if tx := storage.ExtractTransaction(ctx); tx != nil { - relativePath = tx.OriginalRepository(repo).GetRelativePath() - } + //if tx := storage.ExtractTransaction(ctx); tx != nil { + // relativePath = tx.OriginalRepository(repo).GetRelativePath() + //} cgroupsAddCommandOpts = []cgroups.AddCommandOption{ cgroups.WithCgroupKey(repo.GetStorageName() + "/" + relativePath), } diff --git a/internal/git/gitcmd/command_factory_cgroup_test.go b/internal/git/gitcmd/command_factory_cgroup_test.go index ce50897e629..7e0cafb3f8b 100644 --- a/internal/git/gitcmd/command_factory_cgroup_test.go +++ b/internal/git/gitcmd/command_factory_cgroup_test.go @@ -9,7 +9,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/cgroups" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -65,11 +67,15 @@ func TestNewCommandAddsToCgroup(t *testing.T) { type mockTransaction struct { storage.Transaction originalRepo *gitalypb.Repository + fs storage.FS } func (m *mockTransaction) OriginalRepository(storage.Repository) *gitalypb.Repository { return m.originalRepo } +func (m *mockTransaction) FS() storage.FS { + return m.fs +} func TestNewCommandCgroupStable(t *testing.T) { t.Parallel() @@ -109,10 +115,14 @@ func TestNewCommandCgroupStable(t *testing.T) { require.NoError(t, err) defer cleanup() - originalRepo := &gitalypb.Repository{StorageName: "default", RelativePath: "some/relative/path"} + //originalRepo := &gitalypb.Repository{StorageName: "default", RelativePath: "some/relative/path"} + locator := config.NewLocator(cfg) + storagePath, err := locator.GetStorageByName(ctx, "default") + require.NoError(t, err) ctx = storage.ContextWithTransaction(ctx, &mockTransaction{ - originalRepo: originalRepo, + fs: fsrecorder.NewFS(storagePath, nil), + //originalRepo: originalRepo, }) cmd, err := gitCmdFactory.New(ctx, repo, gitcmd.Command{ @@ -128,6 +138,6 @@ func TestNewCommandCgroupStable(t *testing.T) { require.NotNil(t, customFields) logrusFields := customFields.Fields() - require.Equal(t, originalRepo.GetStorageName()+"/"+originalRepo.GetRelativePath(), logrusFields["command.cgroup_path"]) + require.Equal(t, "default/"+repo.GetRelativePath(), logrusFields["command.cgroup_path"]) }) } diff --git a/internal/git/gitcmd/reference.go b/internal/git/gitcmd/reference.go index a54f4674356..4cc7a3379ed 100644 --- a/internal/git/gitcmd/reference.go +++ b/internal/git/gitcmd/reference.go @@ -66,10 +66,11 @@ func GetReferences(ctx context.Context, repoExecutor RepositoryExecutor, cfg Get // GetSymbolicRef reads the symbolic reference. func GetSymbolicRef(ctx context.Context, repoExecutor RepositoryExecutor, refname git.ReferenceName) (git.Reference, error) { var stdout strings.Builder + var stderr strings.Builder if err := repoExecutor.ExecAndWait(ctx, Command{ Name: "symbolic-ref", Args: []string{string(refname)}, - }, WithDisabledHooks(), WithStdout(&stdout)); err != nil { + }, WithDisabledHooks(), WithStdout(&stdout), WithStderr(&stderr)); err != nil { return git.Reference{}, err } diff --git a/internal/git/gittest/repo.go b/internal/git/gittest/repo.go index fb15ffff946..69a7c80faf6 100644 --- a/internal/git/gittest/repo.go +++ b/internal/git/gittest/repo.go @@ -130,6 +130,8 @@ func CreateRepository(tb testing.TB, ctx context.Context, cfg config.Cfg, config require.Less(tb, len(configs), 2, "you must either pass no or exactly one option") + //ffSet := featureflag.FromContext(ctx) + opts := CreateRepositoryConfig{} if len(configs) == 1 { opts = configs[0] @@ -205,7 +207,11 @@ func CreateRepository(tb testing.TB, ctx context.Context, cfg config.Cfg, config tb.Cleanup(func() { // The ctx parameter would be canceled by now as the tests defer the cancellation. - if _, err := client.RemoveRepository(context.TODO(), &gitalypb.RemoveRepositoryRequest{ + ctx := context.TODO() + //for k, v := range ffSet { + // ctx = featureflag.ContextWithFeatureFlag(ctx, k, v) + //} + if _, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ Repository: repository, }); err != nil { if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { diff --git a/internal/git/localrepo/paths.go b/internal/git/localrepo/paths.go index e463334b736..aa15f70d4bd 100644 --- a/internal/git/localrepo/paths.go +++ b/internal/git/localrepo/paths.go @@ -59,7 +59,7 @@ func (repo *Repo) ObjectDirectoryPath(ctx context.Context) (string, error) { // have a repository-specific prefix which we must check in order to determine whether the // quarantine directory does in fact belong to the repo at hand. if _, origError := storage.ValidateRelativePath(repoPath, objectDirectoryPath); origError != nil { - tempDir, err := repo.locator.TempDir(repo.GetStorageName()) + tempDir, err := repo.locator.TempDir(ctx, repo.GetStorageName()) if err != nil { return "", structerr.NewInvalidArgument("getting storage's temporary directory: %w", err) } diff --git a/internal/git/localrepo/repo.go b/internal/git/localrepo/repo.go index 514c9eba8b5..2d19d8a9eb3 100644 --- a/internal/git/localrepo/repo.go +++ b/internal/git/localrepo/repo.go @@ -222,8 +222,8 @@ func errorWithStderr(err error, stderr []byte) error { // StorageTempDir returns the temporary dir for the storage where the repo is on. // When this directory does not exist yet, it's being created. -func (repo *Repo) StorageTempDir() (string, error) { - tempPath, err := repo.locator.TempDir(repo.GetStorageName()) +func (repo *Repo) StorageTempDir(ctx context.Context) (string, error) { + tempPath, err := repo.locator.TempDir(ctx, repo.GetStorageName()) if err != nil { return "", err } diff --git a/internal/git/localrepo/repo_test.go b/internal/git/localrepo/repo_test.go index 8cfa3f0120b..cd631e835e6 100644 --- a/internal/git/localrepo/repo_test.go +++ b/internal/git/localrepo/repo_test.go @@ -260,11 +260,11 @@ func TestRepo_StorageTempDir(t *testing.T) { }) repo := New(testhelper.NewLogger(t), locator, gitCmdFactory, catfileCache, repoProto) - expected, err := locator.TempDir(cfg.Storages[0].Name) + expected, err := locator.TempDir(nil, cfg.Storages[0].Name) require.NoError(t, err) require.NoDirExists(t, expected) - tempPath, err := repo.StorageTempDir() + tempPath, err := repo.StorageTempDir(nil) require.NoError(t, err) require.DirExists(t, expected) require.Equal(t, expected, tempPath) diff --git a/internal/git/objectpool/pool.go b/internal/git/objectpool/pool.go index cd1d297ee8c..46e93acf140 100644 --- a/internal/git/objectpool/pool.go +++ b/internal/git/objectpool/pool.go @@ -64,7 +64,7 @@ func FromProto( // When creating repositories in the ObjectPool service we will first create the // repository in a temporary directory. So we need to check whether the path we see // here is in such a temporary directory and let it pass. - tempDir, err := locator.TempDir(proto.GetRepository().GetStorageName()) + tempDir, err := locator.TempDir(ctx, proto.GetRepository().GetStorageName()) if err != nil { return nil, fmt.Errorf("getting temporary storage directory: %w", err) } diff --git a/internal/gitaly/config/locator.go b/internal/gitaly/config/locator.go index bc6bc2740e6..2f8f7d568e0 100644 --- a/internal/gitaly/config/locator.go +++ b/internal/gitaly/config/locator.go @@ -149,12 +149,18 @@ func (l *configLocator) GetRepoPath(ctx context.Context, repo storage.Repository } // GetStorageByName will return the path for the storage, which is fetched by -// its key. An error is return if it cannot be found. +// its key. If we are in transaction, the file system snapshot's root directory is returned. +// An error is return if it cannot be found. func (l *configLocator) GetStorageByName(ctx context.Context, storageName string) (string, error) { if storageName == "" { return "", structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet) } + tx := storage.ExtractTransaction(ctx) + if tx != nil { + return tx.FS().Root(), nil + } + storagePath, ok := l.conf.StoragePath(storageName) if !ok { return "", storage.NewStorageNotFoundError(storageName) @@ -174,7 +180,12 @@ func (l *configLocator) StateDir(storageName string) (string, error) { } // TempDir returns the path to the temp dir for a storage. -func (l *configLocator) TempDir(storageName string) (string, error) { +func (l *configLocator) TempDir(ctx context.Context, storageName string) (string, error) { + if ctx != nil { + if txn := storage.ExtractTransaction(ctx); txn != nil { + return filepath.Join(txn.FS().Root(), tmpRootPrefix), nil + } + } return l.getPath(storageName, tmpRootPrefix) } diff --git a/internal/gitaly/config/locator_test.go b/internal/gitaly/config/locator_test.go index 07d95f9e12d..3c957956fe3 100644 --- a/internal/gitaly/config/locator_test.go +++ b/internal/gitaly/config/locator_test.go @@ -242,20 +242,20 @@ func TestConfigLocator_TempDir(t *testing.T) { locator := config.NewLocator(cfg) t.Run("storage exists", func(t *testing.T) { - path, err := locator.TempDir(storageName) + path, err := locator.TempDir(nil, storageName) require.NoError(t, err) require.Equal(t, path, filepath.Join(cfg.Storages[0].Path, "+gitaly/tmp")) }) t.Run("storage doesn't exist on disk", func(t *testing.T) { require.NoError(t, os.RemoveAll(cfg.Storages[1].Path)) - path, err := locator.TempDir(cfg.Storages[1].Name) + path, err := locator.TempDir(nil, cfg.Storages[1].Name) require.NoError(t, err) require.Equal(t, filepath.Join(cfg.Storages[1].Path, "+gitaly/tmp"), path) }) t.Run("unknown storage", func(t *testing.T) { - _, err := locator.TempDir("unknown") + _, err := locator.TempDir(nil, "unknown") require.Equal(t, structerr.NewInvalidArgument(`tmp dir: no such storage: "unknown"`), err) }) } diff --git a/internal/gitaly/hook/custom.go b/internal/gitaly/hook/custom.go index e1403cf1516..c55cae769a5 100644 --- a/internal/gitaly/hook/custom.go +++ b/internal/gitaly/hook/custom.go @@ -47,6 +47,7 @@ func (e CustomHookError) Unwrap() error { // // Any files which are either not executable or have a trailing `~` are ignored. func (m *GitLabHookManager) newCustomHooksExecutor(ctx context.Context, repo *gitalypb.Repository, hookName string) (customHooksExecutor, error) { + repoPath, err := m.locator.GetRepoPath(ctx, repo) if err != nil { return nil, err @@ -161,8 +162,17 @@ func isValidHook(path string) bool { return true } -func (m *GitLabHookManager) customHooksEnv(ctx context.Context, payload gitcmd.HooksPayload, pushOptions []string, envs []string) ([]string, error) { - repoPath, err := m.locator.GetRepoPath(ctx, payload.Repo, storage.WithRepositoryVerificationSkipped()) +func (m *GitLabHookManager) customHooksEnv(ctx context.Context, payload gitcmd.HooksPayload, pushOptions []string, envs []string, + useCtxToFindRepoPath bool) ([]string, error) { + + var repoPath string + var err error + if useCtxToFindRepoPath { + repoPath, err = m.locator.GetRepoPath(ctx, payload.Repo, storage.WithRepositoryVerificationSkipped()) + } else { + repoPath, err = m.locator.GetRepoPath(nil, payload.Repo, storage.WithRepositoryVerificationSkipped()) + + } if err != nil { return nil, err } diff --git a/internal/gitaly/hook/postreceive.go b/internal/gitaly/hook/postreceive.go index 888d3dfaeea..784815e91cc 100644 --- a/internal/gitaly/hook/postreceive.go +++ b/internal/gitaly/hook/postreceive.go @@ -227,12 +227,12 @@ func (m *GitLabHookManager) postReceiveHook(ctx context.Context, payload gitcmd. return errors.New("") } - executor, err := m.newCustomHooksExecutor(ctx, repo, "post-receive") + executor, err := m.newCustomHooksExecutor(nil, repo, "post-receive") if err != nil { return structerr.NewInternal("creating custom hooks executor: %w", err) } - customEnv, err := m.customHooksEnv(ctx, payload, pushOptions, env) + customEnv, err := m.customHooksEnv(ctx, payload, pushOptions, env, false) if err != nil { return structerr.NewInternal("constructing custom hook environment: %w", err) } diff --git a/internal/gitaly/hook/prereceive.go b/internal/gitaly/hook/prereceive.go index bf638e7b780..e26d788ffc4 100644 --- a/internal/gitaly/hook/prereceive.go +++ b/internal/gitaly/hook/prereceive.go @@ -172,7 +172,7 @@ func (m *GitLabHookManager) preReceiveHook(ctx context.Context, payload gitcmd.H return fmt.Errorf("creating custom hooks executor: %w", err) } - customEnv, err := m.customHooksEnv(ctx, payload, pushOptions, envs) + customEnv, err := m.customHooksEnv(ctx, payload, pushOptions, envs, true) if err != nil { return structerr.NewInternal("constructing custom hook environment: %w", err) } diff --git a/internal/gitaly/hook/referencetransaction.go b/internal/gitaly/hook/referencetransaction.go index 58d3b8c4e84..5297ec079b3 100644 --- a/internal/gitaly/hook/referencetransaction.go +++ b/internal/gitaly/hook/referencetransaction.go @@ -48,6 +48,9 @@ func (m *GitLabHookManager) ReferenceTransactionHook(ctx context.Context, state if err != nil { return fmt.Errorf("get transaction: %w", err) } + if tx != nil { + ctx = storage.ContextWithTransaction(ctx, tx) + } } var phase voting.Phase diff --git a/internal/gitaly/hook/update.go b/internal/gitaly/hook/update.go index ee61df42246..1d03cb098f7 100644 --- a/internal/gitaly/hook/update.go +++ b/internal/gitaly/hook/update.go @@ -63,7 +63,7 @@ func (m *GitLabHookManager) updateHook(ctx context.Context, payload gitcmd.Hooks return structerr.NewInternal("%w", err) } - customEnv, err := m.customHooksEnv(ctx, payload, nil, env) + customEnv, err := m.customHooksEnv(ctx, payload, nil, env, true) if err != nil { return structerr.NewInternal("constructing custom hook environment: %w", err) } diff --git a/internal/gitaly/linguist/language_stats.go b/internal/gitaly/linguist/language_stats.go index 1b432b6d1cf..d130c43f852 100644 --- a/internal/gitaly/linguist/language_stats.go +++ b/internal/gitaly/linguist/language_stats.go @@ -127,7 +127,7 @@ func (c *languageStats) save(ctx context.Context, repo *localrepo.Repo, commitID return fmt.Errorf("languageStats save get repo path: %w", err) } - tempPath, err := repo.StorageTempDir() + tempPath, err := repo.StorageTempDir(ctx) if err != nil { return fmt.Errorf("languageStats locate temp dir: %w", err) } diff --git a/internal/gitaly/repoutil/custom_hooks.go b/internal/gitaly/repoutil/custom_hooks.go index 39448676fea..fd06577e7fa 100644 --- a/internal/gitaly/repoutil/custom_hooks.go +++ b/internal/gitaly/repoutil/custom_hooks.go @@ -112,6 +112,7 @@ func SetCustomHooks( var originalCustomHooksRelativePath string if tx := storage.ExtractTransaction(ctx); tx != nil { + //originalRelativePath := repo.GetRelativePath() originalRelativePath, err := filepath.Rel(tx.FS().Root(), repoPath) if err != nil { return fmt.Errorf("original relative path: %w", err) @@ -243,6 +244,156 @@ func SetCustomHooks( return nil } +// SetCustomHooks2 is used by test helper first, we need to have ctx with txn so that we get the FS.root() as +// storagepath, but we donn't want it run WAL related logic ... +func SetCustomHooks2( + ctx context.Context, + logger log.Logger, + locator storage.Locator, + txManager transaction.Manager, + reader io.Reader, + repo storage.Repository, + skipTxn bool, +) error { + repoPath, err := locator.GetRepoPath(ctx, repo) + if err != nil { + return fmt.Errorf("getting repo path: %w", err) + } + + var originalCustomHooksRelativePath string + if tx := storage.ExtractTransaction(ctx); tx != nil && !skipTxn { + //originalRelativePath := repo.GetRelativePath() + originalRelativePath, err := filepath.Rel(tx.FS().Root(), repoPath) + if err != nil { + return fmt.Errorf("original relative path: %w", err) + } + + originalCustomHooksRelativePath = filepath.Join(originalRelativePath, CustomHooksDir) + + // Log a deletion of the existing custom hooks so they are removed before the + // new ones are put in place. + if err := storage.RecordDirectoryRemoval( + tx.FS(), tx.FS().Root(), originalCustomHooksRelativePath, + ); err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("record custom hook removal: %w", err) + } + } + + // The `custom_hooks` directory in the repository is locked to prevent + // concurrent modification of hooks. + hooksLock, err := safe.NewLockingDirectory(repoPath, CustomHooksDir) + if err != nil { + return fmt.Errorf("creating hooks lock: %w", err) + } + + if err := hooksLock.Lock(); err != nil { + return fmt.Errorf("locking hooks: %w", err) + } + defer func() { + // If the `.lock` file is not removed from the `custom_hooks` directory, + // future modifications to the repository's hooks will be prevented. If + // this occurs, the `.lock` file will have to be manually removed. + if err := hooksLock.Unlock(); err != nil { + logger.WithError(err).ErrorContext(ctx, "failed to unlock hooks") + } + }() + + // Create a temporary directory to write the new hooks to and also + // temporarily store the current repository hooks. This enables "atomic" + // directory swapping by acting as an intermediary storage location between + // moves. + tmpDir, err := tempdir.NewWithoutContext(repo.GetStorageName(), logger, locator) + if err != nil { + return fmt.Errorf("creating temp directory: %w", err) + } + + defer func() { + if err := os.RemoveAll(tmpDir.Path()); err != nil { + logger.WithError(err).WarnContext(ctx, "failed to remove temporary directory") + } + }() + + if err := ExtractHooks(ctx, logger, reader, tmpDir.Path(), false); err != nil { + return fmt.Errorf("extracting hooks: %w", err) + } + + tempHooksPath := filepath.Join(tmpDir.Path(), CustomHooksDir) + + // No hooks will be extracted if the tar archive is empty. If this happens + // it means the repository should be set with an empty `custom_hooks` + // directory. Create `custom_hooks` in the temporary directory so that any + // existing repository hooks will be replaced with this empty directory. + if err := os.Mkdir(tempHooksPath, mode.Directory); err != nil && !errors.Is(err, fs.ErrExist) { + return fmt.Errorf("making temp hooks directory: %w", err) + } + + preparedVote, err := newDirectoryVote(tempHooksPath) + if err != nil { + return fmt.Errorf("generating prepared vote: %w", err) + } + + // Cast prepared vote with hash of the extracted archive in the temporary + // `custom_hooks` directory. + if err := voteCustomHooks(ctx, txManager, preparedVote, voting.Prepared); err != nil { + return fmt.Errorf("casting prepared vote: %w", err) + } + + repoHooksPath := filepath.Join(repoPath, CustomHooksDir) + prevHooksPath := filepath.Join(tmpDir.Path(), "previous_hooks") + + // If the `custom_hooks` directory exists in the repository, move the + // current hooks to `previous_hooks` in the temporary directory. + if err := os.Rename(repoHooksPath, prevHooksPath); err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("moving current hooks to temp: %w", err) + } + + syncer := safe.NewSyncer() + + if storage.NeedsSync(ctx) { + // Sync the custom hooks in the temporary directory before being moved into + // the repository. This makes the move atomic as there is no state where the + // move succeeds, but the hook files themselves are not yet on the disk, or + // are partially written. + if err := syncer.SyncRecursive(ctx, tempHooksPath); err != nil { + return fmt.Errorf("syncing extracted custom hooks: %w", err) + } + } + + // Move `custom_hooks` from the temporary directory to the repository. + if err := os.Rename(tempHooksPath, repoHooksPath); err != nil { + return fmt.Errorf("moving new hooks to repo: %w", err) + } + + if storage.NeedsSync(ctx) { + // Sync the parent directory after a move to ensure the directory entry of the + // hooks directory is flushed to the disk. + if err := syncer.SyncParent(ctx, repoHooksPath); err != nil { + return fmt.Errorf("syncing custom hooks parent directory: %w", err) + } + } + + committedVote, err := newDirectoryVote(repoHooksPath) + if err != nil { + return fmt.Errorf("generating committed vote: %w", err) + } + + // Cast committed vote with hash of the extracted archive in the repository + // `custom_hooks` directory. + if err := voteCustomHooks(ctx, txManager, committedVote, voting.Committed); err != nil { + return fmt.Errorf("casting committed vote: %w", err) + } + + if tx := storage.ExtractTransaction(ctx); tx != nil && !skipTxn { + if err := storage.RecordDirectoryCreation( + tx.FS(), originalCustomHooksRelativePath, + ); err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("record custom hook creation: %w", err) + } + } + + return nil +} + // newDirectoryVote creates a voting.VoteHash by walking the specified path and // generating a hash based on file name, permissions, and data. func newDirectoryVote(basePath string) (*voting.VoteHash, error) { diff --git a/internal/gitaly/repoutil/remove.go b/internal/gitaly/repoutil/remove.go index d74429e7224..adede28ca9c 100644 --- a/internal/gitaly/repoutil/remove.go +++ b/internal/gitaly/repoutil/remove.go @@ -66,7 +66,8 @@ func remove( } } - tempDir, err := locator.TempDir(repository.GetStorageName()) + // Need a tmp folder in OriginalStoragePath + tempDir, err := locator.TempDir(ctx, repository.GetStorageName()) if err != nil { return structerr.NewInternal("temporary directory: %w", err) } diff --git a/internal/gitaly/service/hook/reference_transaction.go b/internal/gitaly/service/hook/reference_transaction.go index 410e3b0a897..7aef4685ce0 100644 --- a/internal/gitaly/service/hook/reference_transaction.go +++ b/internal/gitaly/service/hook/reference_transaction.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" @@ -17,12 +18,32 @@ func validateReferenceTransactionHookRequest(ctx context.Context, locator storag } func (s *server) ReferenceTransactionHook(stream gitalypb.HookService_ReferenceTransactionHookServer) error { + request, err := stream.Recv() if err != nil { return structerr.NewInternal("receiving first request: %w", err) } - if err := validateReferenceTransactionHookRequest(stream.Context(), s.locator, request); err != nil { + // TODO change this to be better + ctx := stream.Context() + var tx storage.Transaction + env := request.GetEnvironmentVariables() + payload, err := gitcmd.HooksPayloadFromEnv(env) + if err == nil { + if payload.TransactionID > 0 { + tx, err = s.txRegistry.Get(payload.TransactionID) + + // TODO in some test, s.txRegistry and s.manager.txRegistry is not the same + //if err != nil { + // return fmt.Errorf("get transaction: %w", err) + //} + } + if tx != nil { + ctx = storage.ContextWithTransaction(ctx, tx) + } + } + + if err := validateReferenceTransactionHookRequest(ctx, s.locator, request); err != nil { return structerr.NewInvalidArgument("%w", err) } diff --git a/internal/gitaly/storage/context.go b/internal/gitaly/storage/context.go index dc076310c9d..d99bccc5c4d 100644 --- a/internal/gitaly/storage/context.go +++ b/internal/gitaly/storage/context.go @@ -39,6 +39,11 @@ func ContextWithTransaction(ctx context.Context, tx Transaction) context.Context // ExtractTransaction extracts the transaction from the context. Nil is returned if there's // no transaction in the context. func ExtractTransaction(ctx context.Context) Transaction { + + if ctx == nil { + return nil + } + value := ctx.Value(keyTransaction{}) if value == nil { return nil diff --git a/internal/gitaly/storage/locator.go b/internal/gitaly/storage/locator.go index 9be1ae0f6a6..ae7c4afdda5 100644 --- a/internal/gitaly/storage/locator.go +++ b/internal/gitaly/storage/locator.go @@ -92,7 +92,7 @@ type Locator interface { // CacheDir returns the path to the cache dir for a storage. CacheDir(storageName string) (string, error) // TempDir returns the path to the temp dir for a storage. - TempDir(storageName string) (string, error) + TempDir(ctx context.Context, storageName string) (string, error) // StateDir returns the path to the state dir for a storage. StateDir(storageName string) (string, error) // PartitionsDir returns the path to the partitions dir for a storage. diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go index 0cd3ca080f7..69dce18d1de 100644 --- a/internal/gitaly/storage/storagemgr/middleware_test.go +++ b/internal/gitaly/storage/storagemgr/middleware_test.go @@ -351,8 +351,10 @@ messages and behavior by erroring out the requests before they even hit this int }, assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) { expected := validAdditionalRepository() - // The additional repository's relative path should have been rewritten. - require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + //// The additional repository's relative path should have been rewritten. + //require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + + require.Equal(t, expected.GetRelativePath(), actual.GetRelativePath()) // But the restored non-snapshotted repository should match the original. testhelper.ProtoEqual(t, expected, storage.ExtractTransaction(ctx).OriginalRepository(actual)) }, @@ -384,8 +386,9 @@ messages and behavior by erroring out the requests before they even hit this int }, assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) { expected := validAdditionalRepository() - // The additional repository's relative path should have been rewritten. - require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + //// The additional repository's relative path should have been rewritten. + //require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + require.Equal(t, expected.GetRelativePath(), actual.GetRelativePath()) // But the restored non-snapshotted repository should match the original. testhelper.ProtoEqual(t, expected, storage.ExtractTransaction(ctx).OriginalRepository(actual)) }, @@ -535,8 +538,13 @@ messages and behavior by erroring out the requests before they even hit this int expectedRepo := validRepository() actualRepo := repo - // When run in a transaction, the relative path will be pointed to the snapshot. - assert.NotEqual(t, expectedRepo.GetRelativePath(), repo.GetRelativePath()) + // When run in a transaction, the relative path will be the same, but the + // transaction should be included in the ctx + tx := storage.ExtractTransaction(ctx) + assert.NotNil(t, tx, "transaction not in the context") + assert.Equal(t, expectedRepo.GetRelativePath(), repo.GetRelativePath(), + "relative path should be the same even when in transaction") + expectedRepo.RelativePath = "" actualRepo.RelativePath = "" diff --git a/internal/gitaly/storage/storagemgr/partition/apply_operations.go b/internal/gitaly/storage/storagemgr/partition/apply_operations.go index a531f804c51..7e498c588b6 100644 --- a/internal/gitaly/storage/storagemgr/partition/apply_operations.go +++ b/internal/gitaly/storage/storagemgr/partition/apply_operations.go @@ -60,6 +60,18 @@ func applyOperations(ctx context.Context, sync func(context.Context, string) err path := string(op.GetPath()) if err := os.Remove(filepath.Join(storageRoot, path)); err != nil && !errors.Is(err, fs.ErrNotExist) { + //entries, err := os.ReadDir(filepath.Join(storageRoot, path)) + //var entriesString []string + //fmt.Printf("Contents of directory '%s':\n", path) + //for _, entry := range entries { + // if entry.IsDir() { + // fmt.Printf("[DIR] %s\n", entry.Name()) + // } else { + // fmt.Printf("[FILE] %s\n", entry.Name()) + // } + // entriesString = append(entriesString, entry.Name()) + //} + //return fmt.Errorf("remove: dirs %v, %w", entriesString, err) return fmt.Errorf("remove: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go index b4aa3d6f7f6..93c66163ef2 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go @@ -31,7 +31,7 @@ func NewLeftoverFileMigration(locator storage.Locator) Migration { Fn: func(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { // Use snapshotFilter to match entry paths that must be kept in the repo. snapshotFilter := snapshot.NewRegexSnapshotFilter() - storagePath, err := locator.GetStorageByName(ctx, storageName) + storagePath, err := locator.GetStorageByName(nil, storageName) if err != nil { return fmt.Errorf("resolve storage path: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go index 6004432011d..0d4be0384f5 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go @@ -6,7 +6,6 @@ import ( "maps" "os" "path/filepath" - "sync" "testing" "github.com/stretchr/testify/require" @@ -22,7 +21,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/transactiontest" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -35,15 +33,7 @@ func TestNewLeftoverFileMigration_WithOrWithoutFeatureFlag(t *testing.T) { } func testNewLeftoverFileMigration(t *testing.T, ctx context.Context) { - var mu sync.RWMutex t.Parallel() - cfg := testcfg.Build(t) - - var migrationPtr *[]migration.Migration - var migrations []migration.Migration - migrationPtr = &migrations - repoClient, socket := runGitalyServer(t, cfg, testserver.WithMigrations(migrationPtr)) - cfg.SocketPath = socket for _, tc := range []struct { desc string @@ -60,6 +50,13 @@ func testNewLeftoverFileMigration(t *testing.T, ctx context.Context) { t.Run(tc.desc, func(t *testing.T) { t.Parallel() + cfg := testcfg.Build(t) + + var migrationPtr *[]migration.Migration + var migrations []migration.Migration + migrationPtr = &migrations + repoClient, socket := runGitalyServer(t, cfg, testserver.WithMigrations(migrationPtr)) + cfg.SocketPath = socket poolSetup := createLeftoverMigrationRepo(t, ctx, cfg, true, "") repoSetup := createLeftoverMigrationRepo(t, ctx, cfg, false, poolSetup.repoPath) @@ -93,22 +90,12 @@ func testNewLeftoverFileMigration(t *testing.T, ctx context.Context) { } // Avoid racing if other test also change the migration slice. - mu.Lock() migrations = []migration.Migration{migration.NewLeftoverFileMigration(config.NewLocator(cfg))} _, err = repoClient.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{ Repository: repoSetup.repo, }) - mu.Unlock() - require.NoError(t, err) - - // Force WAL sync to ensure migration transaction effects are applied to disk - // before checking directory state. Migration commits to WAL but doesn't - // guarantee immediate filesystem application. - conn, err := client.New(ctx, cfg.SocketPath) require.NoError(t, err) - defer testhelper.MustClose(t, conn) - transactiontest.ForceWALSync(t, ctx, conn, repoSetup.repo) // Verify repo directory testhelper.RequireDirectoryState(t, repoSetup.repoPath, "", repoSetup.expectedRepoDirState) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager.go b/internal/gitaly/storage/storagemgr/partition/migration/manager.go index c5d1c0692ac..d46698e5443 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager.go @@ -160,6 +160,8 @@ func (m *migrationManager) performMigrations(ctx context.Context, opts storage.B if err != nil { return fmt.Errorf("begin migration update: %w", err) } + ctx = storage.ContextWithTransaction(ctx, txn) + defer func() { if returnedErr != nil { if err := txn.Rollback(ctx); err != nil { diff --git a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go index f692dba2276..7bcd5e15d24 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go @@ -29,6 +29,9 @@ func NewReferenceBackendMigration( ID: id, Name: name, Fn: func(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { + + ctx = storage.ContextWithTransaction(ctx, tx) + scopedFactory, err := localRepoFactory.ScopeByStorage(ctx, storageName) if err != nil { return fmt.Errorf("creating storage scoped factory: %w", err) diff --git a/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go b/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go index 8d1d9ca8fab..da369c6c815 100644 --- a/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go @@ -148,10 +148,10 @@ func TestManager(t *testing.T) { require.Equal(t, fs1.Root(), fs2.Root()) // Writing into shared snapshots is not allowed. - require.ErrorIs(t, os.WriteFile(filepath.Join(fs1.Root(), "some file"), nil, fs.ModePerm), os.ErrPermission) + require.ErrorIs(t, os.WriteFile(filepath.Join(fs1.Root(), "repositories", "some file"), nil, fs.ModePerm), os.ErrPermission) expectedDirectoryState := testhelper.DirectoryState{ - "/": {Mode: ModeReadOnlyDirectory}, + "/": {Mode: mode.Directory}, "/repositories": {Mode: ModeReadOnlyDirectory}, "/repositories/a": {Mode: ModeReadOnlyDirectory}, "/repositories/a/refs": {Mode: ModeReadOnlyDirectory}, @@ -185,7 +185,7 @@ func TestManager(t *testing.T) { require.Equal(t, fs1.Root(), fs2.Root()) expectedDirectoryState := testhelper.DirectoryState{ - "/": {Mode: ModeReadOnlyDirectory}, + "/": {Mode: mode.Directory}, "/repositories": {Mode: ModeReadOnlyDirectory}, "/repositories/a": {Mode: ModeReadOnlyDirectory}, "/repositories/a/refs": {Mode: ModeReadOnlyDirectory}, @@ -217,7 +217,7 @@ func TestManager(t *testing.T) { defer testhelper.MustClose(t, fs1) testhelper.RequireDirectoryState(t, fs1.Root(), "", testhelper.DirectoryState{ - "/": {Mode: ModeReadOnlyDirectory}, + "/": {Mode: mode.Directory}, "/pools": {Mode: ModeReadOnlyDirectory}, "/pools/b": {Mode: ModeReadOnlyDirectory}, "/pools/b/refs": {Mode: ModeReadOnlyDirectory}, diff --git a/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot.go b/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot.go index 0820577d2fb..683c92caee0 100644 --- a/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot.go +++ b/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/gitstorage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" @@ -62,7 +63,7 @@ func (s *snapshot) RelativePath(relativePath string) string { func (s *snapshot) Close() error { if s.readOnly { // Make the directories writable again so we can remove the snapshot. - if err := s.setDirectoryMode(mode.Directory); err != nil { + if err := s.setDirectoryMode(mode.Directory, nil); err != nil { return fmt.Errorf("make writable: %w", err) } } @@ -75,8 +76,35 @@ func (s *snapshot) Close() error { } // setDirectoryMode walks the snapshot and sets each directory's mode to the given mode. -func (s *snapshot) setDirectoryMode(mode fs.FileMode) error { - return storage.SetDirectoryMode(s.root, mode) +func (s *snapshot) setDirectoryMode(mode fs.FileMode, paths []string) error { + //if len(paths) == 0 { + // return storage.SetDirectoryMode(s.root, mode) + //} + //for _, path := range paths { + // if err := storage.SetDirectoryMode(filepath.Join(s.root, path), mode); err != nil { + // return err + // } + //} + //return nil + + entries, err := os.ReadDir(s.root) + if err != nil { + return err + } + + if len(entries) == 0 { + return storage.SetDirectoryMode(s.root, mode) + } + + for _, entry := range entries { + if entry.Name() == config.GitalyDataPrefix { + continue + } + if err := storage.SetDirectoryMode(filepath.Join(s.root, entry.Name()), mode); err != nil { + return err + } + } + return nil } // newSnapshot creates a new file system snapshot of the given root directory. The snapshot is created by copying @@ -111,7 +139,7 @@ func newSnapshot(ctx context.Context, storageRoot, snapshotRoot string, relative if readOnly { // Now that we've finished creating the snapshot, change the directory permissions to read-only // to prevent writing in the snapshot. - if err := s.setDirectoryMode(ModeReadOnlyDirectory); err != nil { + if err := s.setDirectoryMode(ModeReadOnlyDirectory, relativePaths); err != nil { return nil, fmt.Errorf("make read-only: %w", err) } } diff --git a/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot_filter_test.go b/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot_filter_test.go index 134df086a64..f1eb6ecbd10 100644 --- a/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot_filter_test.go +++ b/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot_filter_test.go @@ -238,7 +238,7 @@ func getExpectedDirectoryStateAfterSnapshotFilter(ctx context.Context, isExclusi stateMustStay := testhelper.DirectoryState{ // The snapshotting process does not use the existing permissions for // directories in the hierarchy before the repository directories. - "/": {Mode: dirMode()}, + "/": {Mode: mode.Directory}, "/repositories": {Mode: dirMode()}, "/pools": {Mode: dirMode()}, diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 32343607795..210ac556051 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -1399,7 +1399,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } if step.UpdateGitConfig != nil { - updateGitConfig(t, ctx, rewrittenRepo, step.UpdateGitConfig, transaction) + updateGitConfig(t, storage.ContextWithTransaction(ctx, transaction), rewrittenRepo, step.UpdateGitConfig, transaction) } if step.QuarantinedPacks != nil { @@ -1415,17 +1415,17 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } for _, pack := range step.QuarantinedPacks { - require.NoError(t, rewrittenRepo.UnpackObjects(ctx, bytes.NewReader(pack))) + require.NoError(t, rewrittenRepo.UnpackObjects(storage.ContextWithTransaction(ctx, transaction), bytes.NewReader(pack))) } } if step.ReferenceUpdates != nil { - require.NoError(t, performReferenceUpdates(t, ctx, transaction, rewrittenRepo, step.ReferenceUpdates)) + require.NoError(t, performReferenceUpdates(t, storage.ContextWithTransaction(ctx, transaction), transaction, rewrittenRepo, step.ReferenceUpdates)) } if step.DefaultBranchUpdate != nil { require.NoError(t, rewrittenRepo.SetDefaultBranch(storage.ContextWithTransaction(ctx, transaction), nil, step.DefaultBranchUpdate.Reference)) - require.NoError(t, transaction.UpdateReferences(ctx, map[git.ReferenceName]git.ReferenceUpdate{ + require.NoError(t, transaction.UpdateReferences(storage.ContextWithTransaction(ctx, transaction), map[git.ReferenceName]git.ReferenceUpdate{ "HEAD": {NewTarget: step.DefaultBranchUpdate.Reference}, })) } @@ -1475,7 +1475,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.Contains(t, openTransactions, step.TransactionID, "test error: record initial reference value on transaction before beginning it") transaction := openTransactions[step.TransactionID] - require.NoError(t, transaction.RecordInitialReferenceValues(ctx, step.InitialValues)) + require.NoError(t, transaction.RecordInitialReferenceValues(storage.ContextWithTransaction(ctx, transaction), step.InitialValues)) case UpdateReferences: require.Contains(t, openTransactions, step.TransactionID, "test error: reference updates aborted on committed before beginning it") @@ -1488,7 +1488,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas }), ) - require.Equal(t, step.ExpectedError, performReferenceUpdates(t, ctx, transaction, rewrittenRepo, step.ReferenceUpdates)) + require.Equal(t, step.ExpectedError, performReferenceUpdates(t, storage.ContextWithTransaction(ctx, transaction), transaction, rewrittenRepo, step.ReferenceUpdates)) case ReadKey: require.Contains(t, openTransactions, step.TransactionID, "test error: read key called on transaction before beginning it") @@ -1575,25 +1575,25 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas repo := setup.RepositoryFactory.Build(repoProto) if step.DefaultBranch != "" { - require.NoError(t, repo.SetDefaultBranch(ctx, nil, step.DefaultBranch)) + require.NoError(t, repo.SetDefaultBranch(storage.ContextWithTransaction(ctx, transaction), nil, step.DefaultBranch)) } for _, pack := range step.Packs { - require.NoError(t, repo.UnpackObjects(ctx, bytes.NewReader(pack))) + require.NoError(t, repo.UnpackObjects(storage.ContextWithTransaction(ctx, transaction), bytes.NewReader(pack))) } for name, oid := range step.References { - require.NoError(t, repo.UpdateRef(ctx, name, oid, setup.ObjectHash.ZeroOID)) + require.NoError(t, repo.UpdateRef(storage.ContextWithTransaction(ctx, transaction), name, oid, setup.ObjectHash.ZeroOID)) } if step.CustomHooks != nil { require.NoError(t, - repoutil.SetCustomHooks(ctx, logger, config.NewLocator(setup.Config), nil, bytes.NewReader(step.CustomHooks), repo), + repoutil.SetCustomHooks2(storage.ContextWithTransaction(ctx, transaction), logger, config.NewLocator(setup.Config), nil, bytes.NewReader(step.CustomHooks), repo, true), ) } if step.Alternate != "" { - repoPath, err := repo.Path(ctx) + repoPath, err := repo.Path(storage.ContextWithTransaction(ctx, transaction)) require.NoError(t, err) alternatesPath := stats.AlternatesFilePath(repoPath) @@ -1636,7 +1636,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] - RequireRepositories(t, ctx, setup.Config, + RequireRepositories(t, storage.ContextWithTransaction(ctx, transaction), setup.Config, // Assert the contents of the transaction's snapshot. filepath.Join(setup.Config.Storages[0].Path, transaction.snapshot.Prefix()), // Rewrite all of the repositories to point to their snapshots. diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 4b6717850f4..62632be0624 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -26,6 +26,7 @@ import ( housekeepingcfg "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/reftable" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" @@ -413,20 +414,23 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti txn.fs = fsrecorder.NewFS(txn.snapshot.Root(), txn.walEntry) if txn.repositoryTarget() { - txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.snapshot.RelativePath(txn.relativePath)) + //txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.snapshot.RelativePath(txn.relativePath)) + txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.relativePath) if err != nil { return nil, fmt.Errorf("does repository exist: %w", err) } - txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshot.RelativePath(txn.relativePath)) + //txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshot.RelativePath(txn.relativePath)) + txn.snapshotRepository = mgr.repositoryFactory.Build(txn.relativePath) if txn.write { if txn.repositoryExists { + ctxWithTxn := storage.ContextWithTransaction(ctx, txn) txn.quarantineDirectory = filepath.Join(txn.stagingDirectory, "quarantine") if err := os.MkdirAll(filepath.Join(txn.quarantineDirectory, "pack"), mode.Directory); err != nil { return nil, fmt.Errorf("create quarantine directory: %w", err) } - txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(ctx, txn.quarantineDirectory) + txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(ctxWithTxn, txn.quarantineDirectory) if err != nil { return nil, fmt.Errorf("quarantine: %w", err) } @@ -436,13 +440,13 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti return nil, fmt.Errorf("create reference recorder tmp dir: %w", err) } - refBackend, err := txn.snapshotRepository.ReferenceBackend(ctx) + refBackend, err := txn.snapshotRepository.ReferenceBackend(ctxWithTxn) if err != nil { return nil, fmt.Errorf("reference backend: %w", err) } if refBackend == git.ReferenceBackendFiles { - objectHash, err := txn.snapshotRepository.ObjectHash(ctx) + objectHash, err := txn.snapshotRepository.ObjectHash(ctxWithTxn) if err != nil { return nil, fmt.Errorf("object hash: %w", err) } @@ -453,7 +457,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti } if refBackend == git.ReferenceBackendReftables { - snapshotRepositoryPath, err := txn.snapshotRepository.Path(ctx) + snapshotRepositoryPath, err := txn.snapshotRepository.Path(ctxWithTxn) if err != nil { return nil, fmt.Errorf("snapshot repository path: %w", err) } @@ -516,7 +520,8 @@ func (txn *Transaction) PartitionRelativePaths() []string { // the repository in the transaction's snapshot. func (txn *Transaction) RewriteRepository(repo *gitalypb.Repository) *gitalypb.Repository { rewritten := proto.Clone(repo).(*gitalypb.Repository) - rewritten.RelativePath = txn.snapshot.RelativePath(repo.GetRelativePath()) + //rewritten.RelativePath = txn.snapshot.RelativePath(repo.GetRelativePath()) + //rewritten.RelativePath = repo.GetRelativePath() if repo.GetRelativePath() == txn.relativePath { rewritten.GitObjectDirectory = txn.snapshotRepository.GetGitObjectDirectory() @@ -1078,6 +1083,7 @@ type resultChannel chan commitResult // commit queues the transaction for processing and returns once the result has been determined. func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) (storage.LSN, error) { span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.Commit", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() transaction.result = make(resultChannel, 1) @@ -1229,6 +1235,7 @@ func (txn *Transaction) referenceUpdatesToProto() []*gitalypb.LogEntry_Reference // complete state and stages it into the transaction for committing. func (mgr *TransactionManager) stageRepositoryCreation(ctx context.Context, transaction *Transaction) error { span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.stageRepositoryCreation", nil) + //ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() objectHash, err := transaction.snapshotRepository.ObjectHash(ctx) @@ -1264,23 +1271,29 @@ func (mgr *TransactionManager) stageRepositoryCreation(ctx context.Context, tran // setupStagingRepository sets up a snapshot that is used for verifying and staging changes. It contains up to // date state of the partition. It does not have the quarantine configured. -func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, transaction *Transaction) (*localrepo.Repo, error) { +func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, transaction *Transaction) (context.Context, *localrepo.Repo, error) { defer trace.StartRegion(ctx, "setupStagingRepository").End() span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.setupStagingRepository", nil) defer span.Finish() if transaction.stagingSnapshot != nil { - return nil, errors.New("staging snapshot already setup") + return ctx, nil, errors.New("staging snapshot already setup") } var err error transaction.stagingSnapshot, err = mgr.snapshotManager.GetSnapshot(ctx, []string{transaction.relativePath}, true) if err != nil { - return nil, fmt.Errorf("new snapshot: %w", err) + return ctx, nil, fmt.Errorf("new snapshot: %w", err) } - return mgr.repositoryFactory.Build(transaction.stagingSnapshot.RelativePath(transaction.relativePath)), nil + // a wrapped ctx that point to staging root + stagingTxn := &Transaction{ + fs: fsrecorder.NewFS(transaction.stagingSnapshot.Root(), nil), + } + stagingCtx := storage.ContextWithTransaction(ctx, stagingTxn) + + return stagingCtx, mgr.repositoryFactory.Build(transaction.relativePath), nil } // packPrefixRegexp matches the output of `git index-pack` where it @@ -1307,7 +1320,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra return nil } - if _, err := os.Stat(mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath())); err != nil { + if _, err := os.Stat(mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath())); err != nil { if !errors.Is(err, fs.ErrNotExist) { return fmt.Errorf("stat: %w", err) } @@ -1318,6 +1331,8 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra } span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.packObjects", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) + defer span.Finish() // We want to only pack the objects that are present in the quarantine as they are potentially @@ -1494,7 +1509,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra // why we don't track 'tables.list' operation here. func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, transaction *Transaction) error { runPackRefs := transaction.runHousekeeping.packRefs - repoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) + repoPath := mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath()) if err := allowReftableCompaction(repoPath); err != nil { return fmt.Errorf("allow reftable compaction: %w", err) @@ -1590,7 +1605,7 @@ func (mgr *TransactionManager) preparePackRefsFiles(ctx context.Context, transac // First walk to collect the list of loose refs. looseReferences := make(map[git.ReferenceName]struct{}) - repoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) + repoPath := mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath()) if err := filepath.WalkDir(filepath.Join(repoPath, "refs"), func(path string, entry fs.DirEntry, err error) error { if err != nil { return err @@ -1764,7 +1779,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned transaction.result <- func() commitResult { var zeroOID git.ObjectID if transaction.repositoryTarget() { - repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath) + repositoryExists, err := mgr.doesRepositoryExist(nil, transaction.relativePath) if err != nil { return commitResult{error: fmt.Errorf("does repository exist: %w", err)} } @@ -1801,7 +1816,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned if refBackend == git.ReferenceBackendReftables || transaction.runHousekeeping != nil { if refBackend == git.ReferenceBackendReftables { if err := transaction.reftableRecorder.stageTables(ctx, - mgr.getAbsolutePath(transaction.relativePath), + mgr.getAbsolutePath(ctx, transaction.relativePath), transaction, ); err != nil { return commitResult{error: fmt.Errorf("stage tables: %w", err)} @@ -1844,6 +1859,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned // Prepare the transaction to conflict check it. We'll commit it later if we // succeed logging the transaction. mgr.mutex.Lock() + //conflictMgrCtx := storage.ContextWithTransaction(ctx, transaction) preparedTX, err := mgr.conflictMgr.Prepare(ctx, &conflict.Transaction{ ReadLSN: transaction.SnapshotLSN(), TargetRelativePath: transaction.relativePath, @@ -1956,6 +1972,18 @@ func (mgr *TransactionManager) verifyFileSystemOperations(ctx context.Context, t } if err := fsTX.Remove(path); err != nil { + //entries, err := os.ReadDir(path) + //var entriesString []string + //fmt.Printf("Contents of directory '%s':\n", path) + //for _, entry := range entries { + // if entry.IsDir() { + // fmt.Printf("[DIR] %s\n", entry.Name()) + // } else { + // fmt.Printf("[FILE] %s\n", entry.Name()) + // } + // entriesString = append(entriesString, entry.Name()) + //} + //return nil, fmt.Errorf("remove: dirs %v, %w", entriesString, err) return nil, fmt.Errorf("remove: %w", err) } } @@ -2103,7 +2131,7 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relativePath string) (bool, error) { defer trace.StartRegion(ctx, "doesRepositoryExist").End() - stat, err := os.Stat(mgr.getAbsolutePath(relativePath)) + stat, err := os.Stat(mgr.getAbsolutePath(ctx, relativePath)) if err != nil { if errors.Is(err, fs.ErrNotExist) { return false, nil @@ -2119,8 +2147,15 @@ func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relative return true, nil } -// getAbsolutePath returns the relative path's absolute path in the storage. -func (mgr *TransactionManager) getAbsolutePath(relativePath ...string) string { +// getAbsolutePath returns the relative path's absolute path in the storage. when ctx is nil, you get the +// original repo abs path, otherwise you get snapshot repo abs path +func (mgr *TransactionManager) getAbsolutePath(ctx context.Context, relativePath ...string) string { + if ctx != nil { + if txn := storage.ExtractTransaction(ctx); txn != nil { + return filepath.Join(append([]string{txn.FS().Root()}, relativePath...)...) + } + } + return filepath.Join(append([]string{mgr.storagePath}, relativePath...)...) } @@ -2129,6 +2164,196 @@ func packFilePath(walFiles string) string { return filepath.Join(walFiles, "transaction.pack") } +// verifyReferences verifies that the references in the transaction apply on top of the already accepted +// reference changes. The old tips in the transaction are verified against the current actual tips. +// It returns the write-ahead log entry for the reference transactions successfully verified. +func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction *Transaction) error { + defer trace.StartRegion(ctx, "verifyReferences").End() + + if len(transaction.referenceUpdates) == 0 { + return nil + } + + span, _ := tracing.StartSpanIfHasParent(ctx, "transaction.verifyReferences", nil) + defer span.Finish() + + stagingCtx, stagingRepository, err := mgr.setupStagingRepository(ctx, transaction) + if err != nil { + return fmt.Errorf("setup staging snapshot: %w", err) + } + + // Apply quarantine to the staging repository in order to ensure the new objects are available when we + // are verifying references. Without it we'd encounter errors about missing objects as the new objects + // are not in the repository. + stagingRepositoryWithQuarantine, err := stagingRepository.Quarantine(stagingCtx, transaction.quarantineDirectory) + if err != nil { + return fmt.Errorf("quarantine: %w", err) + } + + if err := mgr.verifyReferencesWithGitForReftables(stagingCtx, transaction.manifest.GetReferenceTransactions(), transaction, stagingRepositoryWithQuarantine); err != nil { + return fmt.Errorf("verify references with git: %w", err) + } + + return nil +} + +// verifyReferencesWithGitForReftables is responsible for converting the logical reference updates +// to transaction operations. +// +// To ensure that we don't modify existing tables and autocompact, we lock the existing tables +// before applying the updates. This way the reftable backend will only create new tables +func (mgr *TransactionManager) verifyReferencesWithGitForReftables( + ctx context.Context, + referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, + tx *Transaction, + repo *localrepo.Repo, +) error { + reftablePath := mgr.getAbsolutePath(ctx, repo.GetRelativePath(), "reftable/") + existingTables := make(map[string]struct{}) + lockedTables := make(map[string]struct{}) + + // reftableWalker allows us to walk the reftable directory. + reftableWalker := func(handler func(path string) error) fs.WalkDirFunc { + return func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + if filepath.Base(path) == "reftable" { + return nil + } + + return fmt.Errorf("unexpected directory: %s", filepath.Base(path)) + } + + return handler(path) + } + } + + // We first track the existing tables in the reftable directory. + if err := filepath.WalkDir( + reftablePath, + reftableWalker(func(path string) error { + if filepath.Base(path) == "tables.list" { + return nil + } + + existingTables[path] = struct{}{} + + return nil + }), + ); err != nil { + return fmt.Errorf("finding reftables: %w", err) + } + + // We then lock existing tables as to disable the autocompaction. + for table := range existingTables { + lockedPath := table + ".lock" + + f, err := os.Create(lockedPath) + if err != nil { + return fmt.Errorf("creating reftable lock: %w", err) + } + if err = f.Close(); err != nil { + return fmt.Errorf("closing reftable lock: %w", err) + } + + lockedTables[lockedPath] = struct{}{} + } + + // Since autocompaction is now disabled, adding references will + // add new tables but not compact them. + for _, referenceTransaction := range referenceTransactions { + if err := mgr.applyReferenceTransaction(ctx, referenceTransaction.GetChanges(), repo); err != nil { + return fmt.Errorf("applying reference: %w", err) + } + } + + // With this, we can track the new tables added along with the 'tables.list' + // as operations on the transaction. + if err := filepath.WalkDir( + reftablePath, + reftableWalker(func(path string) error { + if _, ok := lockedTables[path]; ok { + return nil + } + + if _, ok := existingTables[path]; ok { + return nil + } + + base := filepath.Base(path) + + if base == "tables.list" { + tx.walEntry.RemoveDirectoryEntry(filepath.Join(tx.relativePath, "reftable", base)) + } + return tx.walEntry.CreateFile(path, filepath.Join(tx.relativePath, "reftable", base)) + }), + ); err != nil { + return fmt.Errorf("finding reftables: %w", err) + } + + // Finally release the locked tables. + for lockedTable := range lockedTables { + if err := os.Remove(lockedTable); err != nil { + return fmt.Errorf("deleting locked file: %w", err) + } + } + + return nil +} + +// applyReferenceTransaction applies a reference transaction with `git update-ref`. +func (mgr *TransactionManager) applyReferenceTransaction(ctx context.Context, changes []*gitalypb.LogEntry_ReferenceTransaction_Change, repository *localrepo.Repo) (returnedErr error) { + defer trace.StartRegion(ctx, "applyReferenceTransaction").End() + + updater, err := updateref.New(ctx, repository, updateref.WithDisabledTransactions(), updateref.WithNoDeref()) + if err != nil { + return fmt.Errorf("new: %w", err) + } + defer func() { + if err := updater.Close(); err != nil { + returnedErr = errors.Join(returnedErr, fmt.Errorf("close updater: %w", err)) + } + }() + + if err := updater.Start(); err != nil { + return fmt.Errorf("start: %w", err) + } + + version, err := repository.GitVersion(ctx) + if err != nil { + return fmt.Errorf("git version: %w", err) + } + + for _, change := range changes { + if len(change.GetNewTarget()) > 0 { + if err := updater.UpdateSymbolicReference( + version, + git.ReferenceName(change.GetReferenceName()), + git.ReferenceName(change.GetNewTarget()), + ); err != nil { + return fmt.Errorf("update symref %q: %w", change.GetReferenceName(), err) + } + } else { + if err := updater.Update(git.ReferenceName(change.GetReferenceName()), git.ObjectID(change.GetNewOid()), ""); err != nil { + return fmt.Errorf("update %q: %w", change.GetReferenceName(), err) + } + } + } + + if err := updater.Prepare(); err != nil { + return fmt.Errorf("prepare: %w", err) + } + + if err := updater.Commit(); err != nil { + return fmt.Errorf("commit: %w", err) + } + + return nil +} + // appendLogEntry appends a log entry of a transaction to the write-ahead log. After the log entry is appended to WAL, // the corresponding snapshot lock and in-memory reference for the latest appended LSN is created. func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go index 5bdd2debb2d..cf156e9c1b1 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go @@ -2,6 +2,9 @@ package partition import ( "bytes" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/fshistory" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "path/filepath" "testing" @@ -10,9 +13,6 @@ import ( housekeepingcfg "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/fshistory" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) func generateAlternateTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go index e607bb8d7d6..b1494c12e9a 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go @@ -122,6 +122,7 @@ func (mgr *TransactionManager) prepareHousekeeping(ctx context.Context, transact } span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.prepareHousekeeping", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() finishTimer := mgr.metrics.housekeeping.ReportTaskLatency("total", "prepare") @@ -214,8 +215,8 @@ func (mgr *TransactionManager) prepareRepacking(ctx context.Context, transaction // Build a working repository pointing to snapshot repository. Housekeeping task can access the repository // without the needs for quarantine. - workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)) - repoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) + repoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) isFullRepack, err := housekeeping.ValidateRepacking(repack.config) if err != nil { @@ -422,7 +423,7 @@ func (mgr *TransactionManager) prepareCommitGraphs(ctx context.Context, transact } if err := housekeeping.WriteCommitGraph(ctx, - mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)), + mgr.repositoryFactory.Build(transaction.relativePath), transaction.runHousekeeping.writeCommitGraphs.config, ); err != nil { return fmt.Errorf("re-writing commit graph: %w", err) @@ -471,6 +472,7 @@ func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transacti defer trace.StartRegion(ctx, "verifyHousekeeping").End() span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.verifyHousekeeping", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() finishTimer := mgr.metrics.housekeeping.ReportTaskLatency("total", "verify") @@ -589,7 +591,7 @@ func (mgr *TransactionManager) verifyRepacking(ctx context.Context, transaction // Setup a working repository of the destination repository and all changes of current transactions. All // concurrent changes must land in that repository already. - stagingRepository, err := mgr.setupStagingRepository(ctx, transaction) + stagingCtx, stagingRepository, err := mgr.setupStagingRepository(ctx, transaction) if err != nil { return fmt.Errorf("setting up new snapshot for verifying repacking: %w", err) } @@ -626,7 +628,7 @@ func (mgr *TransactionManager) verifyRepacking(ctx context.Context, transaction return fmt.Errorf("walking committed entries: %w", err) } - if err := mgr.verifyObjectsExist(ctx, stagingRepository, objectDependencies); err != nil { + if err := mgr.verifyObjectsExist(stagingCtx, stagingRepository, objectDependencies); err != nil { var errInvalidObject localrepo.InvalidObjectError if errors.As(err, &errInvalidObject) { return errRepackConflictPrunedObject @@ -653,13 +655,13 @@ func (mgr *TransactionManager) verifyPackRefsReftable(transaction *Transaction) // repository before the compaction. However, concurrent writes might have occurred which // wrote new tables to the target repository. We shouldn't loose that data. So we merge // the compacted tables.list with the newer tables from the target repository's tables.list. - repoPath := mgr.getAbsolutePath(transaction.relativePath) + repoPath := mgr.getAbsolutePath(nil, transaction.relativePath) newTableList, err := reftable.ReadTablesList(repoPath) if err != nil { return nil, fmt.Errorf("reading tables.list: %w", err) } - snapshotRepoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) + snapshotRepoPath := mgr.getAbsolutePath(storage.ContextWithTransaction(context.Background(), transaction), transaction.snapshotRepository.GetRelativePath()) // tables.list is hard-linked from the repository to the snapshot, we shouldn't // directly write to it as we'd modify the original. So let's remove the @@ -726,6 +728,7 @@ func (mgr *TransactionManager) verifyPackRefsReftable(transaction *Transaction) // parsing and regenerating the packed-refs file. So, let's settle down with a conflict error at this point. func (mgr *TransactionManager) verifyPackRefsFiles(ctx context.Context, transaction *Transaction, zeroOID git.ObjectID) (*gitalypb.LogEntry_Housekeeping_PackRefs, error) { packRefs := transaction.runHousekeeping.packRefs + ctx = storage.ContextWithTransaction(ctx, transaction) // Check for any concurrent ref deletion between this transaction's snapshot LSN to the end. if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry, objectDependencies map[git.ObjectID]struct{}) error { @@ -788,7 +791,7 @@ func (mgr *TransactionManager) verifyPackRefsFiles(ctx context.Context, transact if isDir { // If this is a directory, we need to ensure it is actually empty before removing // it. Check if we find any directory entries we haven't yet deleted. - entries, err := os.ReadDir(mgr.getAbsolutePath(relativePath)) + entries, err := os.ReadDir(mgr.getAbsolutePath(nil, relativePath)) if err != nil { return fmt.Errorf("read dir: %w", err) } @@ -843,10 +846,10 @@ func (mgr *TransactionManager) prepareOffloading(ctx context.Context, transactio // Loading configurations for offloading cfg := transaction.runHousekeeping.runOffloading.config - workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)) + workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) // workingRepoPath is the current repository path which we are performing operations on. // In the context of transaction, workingRepoPath is a snapshot repository. - workingRepoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + workingRepoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) // Find the original repository's absolute path. In the context of transaction, originalRepo is the repo // which we are taking a snapshot of. originalRepo := &gitalypb.Repository{ @@ -979,10 +982,10 @@ func (mgr *TransactionManager) prepareRehydrating(ctx context.Context, transacti span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.prepareRehydrating", nil) defer span.Finish() - workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)) + workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) // workingRepoPath is the current repository path which we are performing operations on. // In the context of transaction, workingRepoPath is a snapshot repository. - workingRepoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + workingRepoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) prefix := transaction.runHousekeeping.runRehydrating.prefix packFilesToDownload, err := mgr.offloadingSink.List(ctx, prefix) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go index 70241593c8c..05301164c44 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go @@ -1,9 +1,6 @@ package partition import ( - "path/filepath" - "testing" - "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" @@ -12,6 +9,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/fshistory" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "path/filepath" + "testing" ) func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index e1a99c5bf22..d08f6504b00 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -372,8 +372,9 @@ func TestTransactionManager(t *testing.T) { "Housekeeping/CommitGraphs": generateHousekeepingCommitGraphsTests(t, ctx, setup), "Consumer": generateConsumerTests(t, setup), "KeyValue": generateKeyValueTests(t, setup), - "Offloading": generateOffloadingTests(t, ctx, testPartitionID, relativePath), - "Rehydrating": generateRehydratingTests(t, ctx, testPartitionID, relativePath), + + "Offloading": generateOffloadingTests(t, ctx, testPartitionID, relativePath), + "Rehydrating": generateRehydratingTests(t, ctx, testPartitionID, relativePath), } for desc, tests := range subTests { @@ -507,8 +508,8 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio desc: "commit returns if transaction processing stops before transaction acceptance", skip: func(t *testing.T) { testhelper.SkipWithRaft(t, `The hook is installed before appending log entry, before - recorder is activated. Hence, it's not feasible to differentiate between normal - entries and Raft internal entries`) + recorder is activated. Hence, it's not feasible to differentiate between normal + entries and Raft internal entries`) }, steps: steps{ StartManager{ diff --git a/internal/tempdir/clean.go b/internal/tempdir/clean.go index cdd957fdf35..7d40b30d57b 100644 --- a/internal/tempdir/clean.go +++ b/internal/tempdir/clean.go @@ -63,7 +63,7 @@ func clean(logger log.Logger, locator storage.Locator, storage config.Storage) e ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dir, err := locator.TempDir(storage.Name) + dir, err := locator.TempDir(ctx, storage.Name) if err != nil { return fmt.Errorf("temporary dir: %w", err) } diff --git a/internal/tempdir/clean_test.go b/internal/tempdir/clean_test.go index a608ad8fe00..fb66416fce7 100644 --- a/internal/tempdir/clean_test.go +++ b/internal/tempdir/clean_test.go @@ -1,6 +1,7 @@ package tempdir import ( + "context" "os" "path/filepath" "testing" @@ -19,7 +20,7 @@ func TestCleanSuccess(t *testing.T) { cfg := testcfg.Build(t) locator := config.NewLocator(cfg) - cleanRoot, err := locator.TempDir(cfg.Storages[0].Name) + cleanRoot, err := locator.TempDir(nil, cfg.Storages[0].Name) require.NoError(t, err) require.NoError(t, os.MkdirAll(cleanRoot, mode.Directory), "create clean root before setup") @@ -90,7 +91,7 @@ type mockLocator struct { storage.Locator } -func (m mockLocator) TempDir(storageName string) (string, error) { +func (m mockLocator) TempDir(ctx context.Context, storageName string) (string, error) { return "something", nil } @@ -129,19 +130,19 @@ func TestDedupStorages(t *testing.T) { } func chmod(t *testing.T, locator storage.Locator, storage config.Storage, p string, mode os.FileMode) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) require.NoError(t, os.Chmod(filepath.Join(root, p), mode)) } func chtimes(t *testing.T, locator storage.Locator, storage config.Storage, p string, date time.Time) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) require.NoError(t, os.Chtimes(filepath.Join(root, p), date, date)) } func assertEntries(t *testing.T, locator storage.Locator, storage config.Storage, entries ...string) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) foundEntries, err := os.ReadDir(root) @@ -155,7 +156,7 @@ func assertEntries(t *testing.T, locator storage.Locator, storage config.Storage } func makeFile(t *testing.T, locator storage.Locator, storage config.Storage, filePath string, mtime time.Time) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) fullPath := filepath.Join(root, filePath) @@ -164,7 +165,7 @@ func makeFile(t *testing.T, locator storage.Locator, storage config.Storage, fil } func makeDir(t *testing.T, locator storage.Locator, storage config.Storage, dirPath string, mtime time.Time) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) fullPath := filepath.Join(root, dirPath) diff --git a/internal/tempdir/tempdir.go b/internal/tempdir/tempdir.go index f70c1216ce0..6c288d3ba7d 100644 --- a/internal/tempdir/tempdir.go +++ b/internal/tempdir/tempdir.go @@ -76,7 +76,7 @@ func NewRepository(ctx context.Context, storageName string, logger log.Logger, l } func newDirectory(ctx context.Context, storageName string, prefix string, logger log.Logger, loc storage.Locator) (Dir, error) { - root, err := loc.TempDir(storageName) + root, err := loc.TempDir(ctx, storageName) if err != nil { return Dir{}, fmt.Errorf("temp directory: %w", err) } -- GitLab