diff --git a/internal/cache/walker.go b/internal/cache/walker.go index 9444540c438a65a54e0ed3aaa37c068b0b7ffc95..3dff2aaa7f2ac4c6f8e28bd90c56c320914f70e3 100644 --- a/internal/cache/walker.go +++ b/internal/cache/walker.go @@ -149,7 +149,7 @@ func (c *DiskCache) moveAndClear(storage config.Storage) error { logger := c.logger.WithField("storage", storage.Name) logger.Info("clearing disk cache object folder") - tempPath, err := c.locator.TempDir(storage.Name) + tempPath, err := c.locator.TempDir(nil, storage.Name) if err != nil { return fmt.Errorf("temp dir: %w", err) } diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index 59d57a5eeada68d383d71bf09a62de33926653c1..eb83e429256dc8037597b43e443945379398595b 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -241,7 +241,14 @@ func (c *ProcessCache) getOrCreateProcess( span, ctx := tracing.StartSpanIfHasParent(ctx, spanName, nil) defer span.Finish() - cacheKey, isCacheable := newCacheKey(fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())), repo) + //cacheKey, isCacheable := newCacheKey(fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())), repo) + // TODO make this better, hash the txn.FS().Root()? + sessionID := fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())) + txn := storage.ExtractTransaction(ctx) + if txn != nil { + sessionID = fmt.Sprintf("%d+%s", roundToNearestFiveMinute(time.Now()), txn.FS().Root()) + } + cacheKey, isCacheable := newCacheKey(sessionID, repo) if isCacheable { // We only try to look up cached processes in case it is cacheable, which requires a diff --git a/internal/git/gitcmd/command_factory.go b/internal/git/gitcmd/command_factory.go index a31243ef4cd8ab31691c690b4e113547a2d3a4a4..459fd4f2d62945cda5ebb37728975ecac55b3f41 100644 --- a/internal/git/gitcmd/command_factory.go +++ b/internal/git/gitcmd/command_factory.go @@ -512,9 +512,9 @@ func (cf *ExecCommandFactory) newCommand(ctx context.Context, repo storage.Repos var cgroupsAddCommandOpts []cgroups.AddCommandOption if repo != nil { relativePath := repo.GetRelativePath() - if tx := storage.ExtractTransaction(ctx); tx != nil { - relativePath = tx.OriginalRepository(repo).GetRelativePath() - } + //if tx := storage.ExtractTransaction(ctx); tx != nil { + // relativePath = tx.OriginalRepository(repo).GetRelativePath() + //} cgroupsAddCommandOpts = []cgroups.AddCommandOption{ cgroups.WithCgroupKey(repo.GetStorageName() + "/" + relativePath), } diff --git a/internal/git/gitcmd/command_factory_cgroup_test.go b/internal/git/gitcmd/command_factory_cgroup_test.go index ce50897e629ed695d12c9a871d3d1e14ae5849d3..7e0cafb3f8b5fe31b044de3ecac9f3e06dc9699a 100644 --- a/internal/git/gitcmd/command_factory_cgroup_test.go +++ b/internal/git/gitcmd/command_factory_cgroup_test.go @@ -9,7 +9,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/cgroups" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -65,11 +67,15 @@ func TestNewCommandAddsToCgroup(t *testing.T) { type mockTransaction struct { storage.Transaction originalRepo *gitalypb.Repository + fs storage.FS } func (m *mockTransaction) OriginalRepository(storage.Repository) *gitalypb.Repository { return m.originalRepo } +func (m *mockTransaction) FS() storage.FS { + return m.fs +} func TestNewCommandCgroupStable(t *testing.T) { t.Parallel() @@ -109,10 +115,14 @@ func TestNewCommandCgroupStable(t *testing.T) { require.NoError(t, err) defer cleanup() - originalRepo := &gitalypb.Repository{StorageName: "default", RelativePath: "some/relative/path"} + //originalRepo := &gitalypb.Repository{StorageName: "default", RelativePath: "some/relative/path"} + locator := config.NewLocator(cfg) + storagePath, err := locator.GetStorageByName(ctx, "default") + require.NoError(t, err) ctx = storage.ContextWithTransaction(ctx, &mockTransaction{ - originalRepo: originalRepo, + fs: fsrecorder.NewFS(storagePath, nil), + //originalRepo: originalRepo, }) cmd, err := gitCmdFactory.New(ctx, repo, gitcmd.Command{ @@ -128,6 +138,6 @@ func TestNewCommandCgroupStable(t *testing.T) { require.NotNil(t, customFields) logrusFields := customFields.Fields() - require.Equal(t, originalRepo.GetStorageName()+"/"+originalRepo.GetRelativePath(), logrusFields["command.cgroup_path"]) + require.Equal(t, "default/"+repo.GetRelativePath(), logrusFields["command.cgroup_path"]) }) } diff --git a/internal/git/gitcmd/reference.go b/internal/git/gitcmd/reference.go index a54f46743564be18fe391b3d5eec7584a12fe599..4cc7a3379ed9f6a816874ec12ec93ed169ef7cff 100644 --- a/internal/git/gitcmd/reference.go +++ b/internal/git/gitcmd/reference.go @@ -66,10 +66,11 @@ func GetReferences(ctx context.Context, repoExecutor RepositoryExecutor, cfg Get // GetSymbolicRef reads the symbolic reference. func GetSymbolicRef(ctx context.Context, repoExecutor RepositoryExecutor, refname git.ReferenceName) (git.Reference, error) { var stdout strings.Builder + var stderr strings.Builder if err := repoExecutor.ExecAndWait(ctx, Command{ Name: "symbolic-ref", Args: []string{string(refname)}, - }, WithDisabledHooks(), WithStdout(&stdout)); err != nil { + }, WithDisabledHooks(), WithStdout(&stdout), WithStderr(&stderr)); err != nil { return git.Reference{}, err } diff --git a/internal/git/gittest/repo.go b/internal/git/gittest/repo.go index fb15ffff9465cd974b5c26ea0d3b1ede9c343d8a..69a7c80faf6ea86b91b35905ea210712c4715983 100644 --- a/internal/git/gittest/repo.go +++ b/internal/git/gittest/repo.go @@ -130,6 +130,8 @@ func CreateRepository(tb testing.TB, ctx context.Context, cfg config.Cfg, config require.Less(tb, len(configs), 2, "you must either pass no or exactly one option") + //ffSet := featureflag.FromContext(ctx) + opts := CreateRepositoryConfig{} if len(configs) == 1 { opts = configs[0] @@ -205,7 +207,11 @@ func CreateRepository(tb testing.TB, ctx context.Context, cfg config.Cfg, config tb.Cleanup(func() { // The ctx parameter would be canceled by now as the tests defer the cancellation. - if _, err := client.RemoveRepository(context.TODO(), &gitalypb.RemoveRepositoryRequest{ + ctx := context.TODO() + //for k, v := range ffSet { + // ctx = featureflag.ContextWithFeatureFlag(ctx, k, v) + //} + if _, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ Repository: repository, }); err != nil { if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { diff --git a/internal/git/localrepo/paths.go b/internal/git/localrepo/paths.go index e463334b7361214b418543f12b3ed42113e94f13..aa15f70d4bd6b58b3a69e0870cde3c4802881e3f 100644 --- a/internal/git/localrepo/paths.go +++ b/internal/git/localrepo/paths.go @@ -59,7 +59,7 @@ func (repo *Repo) ObjectDirectoryPath(ctx context.Context) (string, error) { // have a repository-specific prefix which we must check in order to determine whether the // quarantine directory does in fact belong to the repo at hand. if _, origError := storage.ValidateRelativePath(repoPath, objectDirectoryPath); origError != nil { - tempDir, err := repo.locator.TempDir(repo.GetStorageName()) + tempDir, err := repo.locator.TempDir(ctx, repo.GetStorageName()) if err != nil { return "", structerr.NewInvalidArgument("getting storage's temporary directory: %w", err) } diff --git a/internal/git/localrepo/repo.go b/internal/git/localrepo/repo.go index 514c9eba8b5fd01c49a809678ae474f97c338113..2d19d8a9eb3c56d906cc3445369cbf863c4f50d4 100644 --- a/internal/git/localrepo/repo.go +++ b/internal/git/localrepo/repo.go @@ -222,8 +222,8 @@ func errorWithStderr(err error, stderr []byte) error { // StorageTempDir returns the temporary dir for the storage where the repo is on. // When this directory does not exist yet, it's being created. -func (repo *Repo) StorageTempDir() (string, error) { - tempPath, err := repo.locator.TempDir(repo.GetStorageName()) +func (repo *Repo) StorageTempDir(ctx context.Context) (string, error) { + tempPath, err := repo.locator.TempDir(ctx, repo.GetStorageName()) if err != nil { return "", err } diff --git a/internal/git/localrepo/repo_test.go b/internal/git/localrepo/repo_test.go index 8cfa3f0120b7beae7c2fc24a600b2d4630f78fc2..cd631e835e67bd18638f0bfbcd717248ecb9b047 100644 --- a/internal/git/localrepo/repo_test.go +++ b/internal/git/localrepo/repo_test.go @@ -260,11 +260,11 @@ func TestRepo_StorageTempDir(t *testing.T) { }) repo := New(testhelper.NewLogger(t), locator, gitCmdFactory, catfileCache, repoProto) - expected, err := locator.TempDir(cfg.Storages[0].Name) + expected, err := locator.TempDir(nil, cfg.Storages[0].Name) require.NoError(t, err) require.NoDirExists(t, expected) - tempPath, err := repo.StorageTempDir() + tempPath, err := repo.StorageTempDir(nil) require.NoError(t, err) require.DirExists(t, expected) require.Equal(t, expected, tempPath) diff --git a/internal/git/objectpool/pool.go b/internal/git/objectpool/pool.go index cd1d297ee8c0564ce9a76bbbf3b97847ef16080e..46e93acf1405e58792b1b49b70aeb6fcea91fbc2 100644 --- a/internal/git/objectpool/pool.go +++ b/internal/git/objectpool/pool.go @@ -64,7 +64,7 @@ func FromProto( // When creating repositories in the ObjectPool service we will first create the // repository in a temporary directory. So we need to check whether the path we see // here is in such a temporary directory and let it pass. - tempDir, err := locator.TempDir(proto.GetRepository().GetStorageName()) + tempDir, err := locator.TempDir(ctx, proto.GetRepository().GetStorageName()) if err != nil { return nil, fmt.Errorf("getting temporary storage directory: %w", err) } diff --git a/internal/gitaly/config/locator.go b/internal/gitaly/config/locator.go index bc6bc2740e6cfc4d1cc31eaa0f1f809ef5dcb880..2f8f7d568e0f7d6d0f54951c6eed350ef2c908dc 100644 --- a/internal/gitaly/config/locator.go +++ b/internal/gitaly/config/locator.go @@ -149,12 +149,18 @@ func (l *configLocator) GetRepoPath(ctx context.Context, repo storage.Repository } // GetStorageByName will return the path for the storage, which is fetched by -// its key. An error is return if it cannot be found. +// its key. If we are in transaction, the file system snapshot's root directory is returned. +// An error is return if it cannot be found. func (l *configLocator) GetStorageByName(ctx context.Context, storageName string) (string, error) { if storageName == "" { return "", structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet) } + tx := storage.ExtractTransaction(ctx) + if tx != nil { + return tx.FS().Root(), nil + } + storagePath, ok := l.conf.StoragePath(storageName) if !ok { return "", storage.NewStorageNotFoundError(storageName) @@ -174,7 +180,12 @@ func (l *configLocator) StateDir(storageName string) (string, error) { } // TempDir returns the path to the temp dir for a storage. -func (l *configLocator) TempDir(storageName string) (string, error) { +func (l *configLocator) TempDir(ctx context.Context, storageName string) (string, error) { + if ctx != nil { + if txn := storage.ExtractTransaction(ctx); txn != nil { + return filepath.Join(txn.FS().Root(), tmpRootPrefix), nil + } + } return l.getPath(storageName, tmpRootPrefix) } diff --git a/internal/gitaly/config/locator_test.go b/internal/gitaly/config/locator_test.go index 07d95f9e12d2d7b6f67b6ba69fc0177ce7679169..3c957956fe302959e42dcdb518b36edb78039b23 100644 --- a/internal/gitaly/config/locator_test.go +++ b/internal/gitaly/config/locator_test.go @@ -242,20 +242,20 @@ func TestConfigLocator_TempDir(t *testing.T) { locator := config.NewLocator(cfg) t.Run("storage exists", func(t *testing.T) { - path, err := locator.TempDir(storageName) + path, err := locator.TempDir(nil, storageName) require.NoError(t, err) require.Equal(t, path, filepath.Join(cfg.Storages[0].Path, "+gitaly/tmp")) }) t.Run("storage doesn't exist on disk", func(t *testing.T) { require.NoError(t, os.RemoveAll(cfg.Storages[1].Path)) - path, err := locator.TempDir(cfg.Storages[1].Name) + path, err := locator.TempDir(nil, cfg.Storages[1].Name) require.NoError(t, err) require.Equal(t, filepath.Join(cfg.Storages[1].Path, "+gitaly/tmp"), path) }) t.Run("unknown storage", func(t *testing.T) { - _, err := locator.TempDir("unknown") + _, err := locator.TempDir(nil, "unknown") require.Equal(t, structerr.NewInvalidArgument(`tmp dir: no such storage: "unknown"`), err) }) } diff --git a/internal/gitaly/hook/custom.go b/internal/gitaly/hook/custom.go index e1403cf1516155d59b16de114280e9bf16b88864..c55cae769a50165e757d93825170bd7517d065b5 100644 --- a/internal/gitaly/hook/custom.go +++ b/internal/gitaly/hook/custom.go @@ -47,6 +47,7 @@ func (e CustomHookError) Unwrap() error { // // Any files which are either not executable or have a trailing `~` are ignored. func (m *GitLabHookManager) newCustomHooksExecutor(ctx context.Context, repo *gitalypb.Repository, hookName string) (customHooksExecutor, error) { + repoPath, err := m.locator.GetRepoPath(ctx, repo) if err != nil { return nil, err @@ -161,8 +162,17 @@ func isValidHook(path string) bool { return true } -func (m *GitLabHookManager) customHooksEnv(ctx context.Context, payload gitcmd.HooksPayload, pushOptions []string, envs []string) ([]string, error) { - repoPath, err := m.locator.GetRepoPath(ctx, payload.Repo, storage.WithRepositoryVerificationSkipped()) +func (m *GitLabHookManager) customHooksEnv(ctx context.Context, payload gitcmd.HooksPayload, pushOptions []string, envs []string, + useCtxToFindRepoPath bool) ([]string, error) { + + var repoPath string + var err error + if useCtxToFindRepoPath { + repoPath, err = m.locator.GetRepoPath(ctx, payload.Repo, storage.WithRepositoryVerificationSkipped()) + } else { + repoPath, err = m.locator.GetRepoPath(nil, payload.Repo, storage.WithRepositoryVerificationSkipped()) + + } if err != nil { return nil, err } diff --git a/internal/gitaly/hook/postreceive.go b/internal/gitaly/hook/postreceive.go index 888d3dfaeea27f3e340b04328850430667767026..784815e91ccfb2aac6b9e674fec35240aa45d352 100644 --- a/internal/gitaly/hook/postreceive.go +++ b/internal/gitaly/hook/postreceive.go @@ -227,12 +227,12 @@ func (m *GitLabHookManager) postReceiveHook(ctx context.Context, payload gitcmd. return errors.New("") } - executor, err := m.newCustomHooksExecutor(ctx, repo, "post-receive") + executor, err := m.newCustomHooksExecutor(nil, repo, "post-receive") if err != nil { return structerr.NewInternal("creating custom hooks executor: %w", err) } - customEnv, err := m.customHooksEnv(ctx, payload, pushOptions, env) + customEnv, err := m.customHooksEnv(ctx, payload, pushOptions, env, false) if err != nil { return structerr.NewInternal("constructing custom hook environment: %w", err) } diff --git a/internal/gitaly/hook/prereceive.go b/internal/gitaly/hook/prereceive.go index bf638e7b780d96eac5165a070f635cd3a8878a04..e26d788ffc4c8fea690293507f5272fd44e316fa 100644 --- a/internal/gitaly/hook/prereceive.go +++ b/internal/gitaly/hook/prereceive.go @@ -172,7 +172,7 @@ func (m *GitLabHookManager) preReceiveHook(ctx context.Context, payload gitcmd.H return fmt.Errorf("creating custom hooks executor: %w", err) } - customEnv, err := m.customHooksEnv(ctx, payload, pushOptions, envs) + customEnv, err := m.customHooksEnv(ctx, payload, pushOptions, envs, true) if err != nil { return structerr.NewInternal("constructing custom hook environment: %w", err) } diff --git a/internal/gitaly/hook/referencetransaction.go b/internal/gitaly/hook/referencetransaction.go index 58d3b8c4e84192fd766ebcbf3f018049bb34a8da..5297ec079b38e17f4ab16e4b589ee1b992718cb2 100644 --- a/internal/gitaly/hook/referencetransaction.go +++ b/internal/gitaly/hook/referencetransaction.go @@ -48,6 +48,9 @@ func (m *GitLabHookManager) ReferenceTransactionHook(ctx context.Context, state if err != nil { return fmt.Errorf("get transaction: %w", err) } + if tx != nil { + ctx = storage.ContextWithTransaction(ctx, tx) + } } var phase voting.Phase diff --git a/internal/gitaly/hook/update.go b/internal/gitaly/hook/update.go index ee61df4224624812b0266c142c8a6c4735cd5cae..1d03cb098f76a4ec14eef93b2cfe6a99749e42a3 100644 --- a/internal/gitaly/hook/update.go +++ b/internal/gitaly/hook/update.go @@ -63,7 +63,7 @@ func (m *GitLabHookManager) updateHook(ctx context.Context, payload gitcmd.Hooks return structerr.NewInternal("%w", err) } - customEnv, err := m.customHooksEnv(ctx, payload, nil, env) + customEnv, err := m.customHooksEnv(ctx, payload, nil, env, true) if err != nil { return structerr.NewInternal("constructing custom hook environment: %w", err) } diff --git a/internal/gitaly/linguist/language_stats.go b/internal/gitaly/linguist/language_stats.go index 1b432b6d1cf8cb974a2f76249685182cbcd528b5..d130c43f8523a69348af628105b2cfa01b041c3a 100644 --- a/internal/gitaly/linguist/language_stats.go +++ b/internal/gitaly/linguist/language_stats.go @@ -127,7 +127,7 @@ func (c *languageStats) save(ctx context.Context, repo *localrepo.Repo, commitID return fmt.Errorf("languageStats save get repo path: %w", err) } - tempPath, err := repo.StorageTempDir() + tempPath, err := repo.StorageTempDir(ctx) if err != nil { return fmt.Errorf("languageStats locate temp dir: %w", err) } diff --git a/internal/gitaly/repoutil/custom_hooks.go b/internal/gitaly/repoutil/custom_hooks.go index 39448676feab10f75a35c1ac04ffb754517a77a4..fd06577e7fa6ef931ff9ccc7d19446840b3da456 100644 --- a/internal/gitaly/repoutil/custom_hooks.go +++ b/internal/gitaly/repoutil/custom_hooks.go @@ -112,6 +112,7 @@ func SetCustomHooks( var originalCustomHooksRelativePath string if tx := storage.ExtractTransaction(ctx); tx != nil { + //originalRelativePath := repo.GetRelativePath() originalRelativePath, err := filepath.Rel(tx.FS().Root(), repoPath) if err != nil { return fmt.Errorf("original relative path: %w", err) @@ -243,6 +244,156 @@ func SetCustomHooks( return nil } +// SetCustomHooks2 is used by test helper first, we need to have ctx with txn so that we get the FS.root() as +// storagepath, but we donn't want it run WAL related logic ... +func SetCustomHooks2( + ctx context.Context, + logger log.Logger, + locator storage.Locator, + txManager transaction.Manager, + reader io.Reader, + repo storage.Repository, + skipTxn bool, +) error { + repoPath, err := locator.GetRepoPath(ctx, repo) + if err != nil { + return fmt.Errorf("getting repo path: %w", err) + } + + var originalCustomHooksRelativePath string + if tx := storage.ExtractTransaction(ctx); tx != nil && !skipTxn { + //originalRelativePath := repo.GetRelativePath() + originalRelativePath, err := filepath.Rel(tx.FS().Root(), repoPath) + if err != nil { + return fmt.Errorf("original relative path: %w", err) + } + + originalCustomHooksRelativePath = filepath.Join(originalRelativePath, CustomHooksDir) + + // Log a deletion of the existing custom hooks so they are removed before the + // new ones are put in place. + if err := storage.RecordDirectoryRemoval( + tx.FS(), tx.FS().Root(), originalCustomHooksRelativePath, + ); err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("record custom hook removal: %w", err) + } + } + + // The `custom_hooks` directory in the repository is locked to prevent + // concurrent modification of hooks. + hooksLock, err := safe.NewLockingDirectory(repoPath, CustomHooksDir) + if err != nil { + return fmt.Errorf("creating hooks lock: %w", err) + } + + if err := hooksLock.Lock(); err != nil { + return fmt.Errorf("locking hooks: %w", err) + } + defer func() { + // If the `.lock` file is not removed from the `custom_hooks` directory, + // future modifications to the repository's hooks will be prevented. If + // this occurs, the `.lock` file will have to be manually removed. + if err := hooksLock.Unlock(); err != nil { + logger.WithError(err).ErrorContext(ctx, "failed to unlock hooks") + } + }() + + // Create a temporary directory to write the new hooks to and also + // temporarily store the current repository hooks. This enables "atomic" + // directory swapping by acting as an intermediary storage location between + // moves. + tmpDir, err := tempdir.NewWithoutContext(repo.GetStorageName(), logger, locator) + if err != nil { + return fmt.Errorf("creating temp directory: %w", err) + } + + defer func() { + if err := os.RemoveAll(tmpDir.Path()); err != nil { + logger.WithError(err).WarnContext(ctx, "failed to remove temporary directory") + } + }() + + if err := ExtractHooks(ctx, logger, reader, tmpDir.Path(), false); err != nil { + return fmt.Errorf("extracting hooks: %w", err) + } + + tempHooksPath := filepath.Join(tmpDir.Path(), CustomHooksDir) + + // No hooks will be extracted if the tar archive is empty. If this happens + // it means the repository should be set with an empty `custom_hooks` + // directory. Create `custom_hooks` in the temporary directory so that any + // existing repository hooks will be replaced with this empty directory. + if err := os.Mkdir(tempHooksPath, mode.Directory); err != nil && !errors.Is(err, fs.ErrExist) { + return fmt.Errorf("making temp hooks directory: %w", err) + } + + preparedVote, err := newDirectoryVote(tempHooksPath) + if err != nil { + return fmt.Errorf("generating prepared vote: %w", err) + } + + // Cast prepared vote with hash of the extracted archive in the temporary + // `custom_hooks` directory. + if err := voteCustomHooks(ctx, txManager, preparedVote, voting.Prepared); err != nil { + return fmt.Errorf("casting prepared vote: %w", err) + } + + repoHooksPath := filepath.Join(repoPath, CustomHooksDir) + prevHooksPath := filepath.Join(tmpDir.Path(), "previous_hooks") + + // If the `custom_hooks` directory exists in the repository, move the + // current hooks to `previous_hooks` in the temporary directory. + if err := os.Rename(repoHooksPath, prevHooksPath); err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("moving current hooks to temp: %w", err) + } + + syncer := safe.NewSyncer() + + if storage.NeedsSync(ctx) { + // Sync the custom hooks in the temporary directory before being moved into + // the repository. This makes the move atomic as there is no state where the + // move succeeds, but the hook files themselves are not yet on the disk, or + // are partially written. + if err := syncer.SyncRecursive(ctx, tempHooksPath); err != nil { + return fmt.Errorf("syncing extracted custom hooks: %w", err) + } + } + + // Move `custom_hooks` from the temporary directory to the repository. + if err := os.Rename(tempHooksPath, repoHooksPath); err != nil { + return fmt.Errorf("moving new hooks to repo: %w", err) + } + + if storage.NeedsSync(ctx) { + // Sync the parent directory after a move to ensure the directory entry of the + // hooks directory is flushed to the disk. + if err := syncer.SyncParent(ctx, repoHooksPath); err != nil { + return fmt.Errorf("syncing custom hooks parent directory: %w", err) + } + } + + committedVote, err := newDirectoryVote(repoHooksPath) + if err != nil { + return fmt.Errorf("generating committed vote: %w", err) + } + + // Cast committed vote with hash of the extracted archive in the repository + // `custom_hooks` directory. + if err := voteCustomHooks(ctx, txManager, committedVote, voting.Committed); err != nil { + return fmt.Errorf("casting committed vote: %w", err) + } + + if tx := storage.ExtractTransaction(ctx); tx != nil && !skipTxn { + if err := storage.RecordDirectoryCreation( + tx.FS(), originalCustomHooksRelativePath, + ); err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("record custom hook creation: %w", err) + } + } + + return nil +} + // newDirectoryVote creates a voting.VoteHash by walking the specified path and // generating a hash based on file name, permissions, and data. func newDirectoryVote(basePath string) (*voting.VoteHash, error) { diff --git a/internal/gitaly/repoutil/remove.go b/internal/gitaly/repoutil/remove.go index d74429e7224c63231612652110e92143e9769dea..adede28ca9c5c3ce1f3b4932b193e32eba2c4ecc 100644 --- a/internal/gitaly/repoutil/remove.go +++ b/internal/gitaly/repoutil/remove.go @@ -66,7 +66,8 @@ func remove( } } - tempDir, err := locator.TempDir(repository.GetStorageName()) + // Need a tmp folder in OriginalStoragePath + tempDir, err := locator.TempDir(ctx, repository.GetStorageName()) if err != nil { return structerr.NewInternal("temporary directory: %w", err) } diff --git a/internal/gitaly/service/hook/reference_transaction.go b/internal/gitaly/service/hook/reference_transaction.go index 410e3b0a8973e217c0f450396f1690a0103e3b11..7aef4685ce08504008903a9c2f71a49ea5d8033f 100644 --- a/internal/gitaly/service/hook/reference_transaction.go +++ b/internal/gitaly/service/hook/reference_transaction.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" @@ -17,12 +18,32 @@ func validateReferenceTransactionHookRequest(ctx context.Context, locator storag } func (s *server) ReferenceTransactionHook(stream gitalypb.HookService_ReferenceTransactionHookServer) error { + request, err := stream.Recv() if err != nil { return structerr.NewInternal("receiving first request: %w", err) } - if err := validateReferenceTransactionHookRequest(stream.Context(), s.locator, request); err != nil { + // TODO change this to be better + ctx := stream.Context() + var tx storage.Transaction + env := request.GetEnvironmentVariables() + payload, err := gitcmd.HooksPayloadFromEnv(env) + if err == nil { + if payload.TransactionID > 0 { + tx, err = s.txRegistry.Get(payload.TransactionID) + + // TODO in some test, s.txRegistry and s.manager.txRegistry is not the same + //if err != nil { + // return fmt.Errorf("get transaction: %w", err) + //} + } + if tx != nil { + ctx = storage.ContextWithTransaction(ctx, tx) + } + } + + if err := validateReferenceTransactionHookRequest(ctx, s.locator, request); err != nil { return structerr.NewInvalidArgument("%w", err) } diff --git a/internal/gitaly/storage/context.go b/internal/gitaly/storage/context.go index dc076310c9d0a2d6ddcba1f59f48d2168512b41d..d99bccc5c4deec2f567310fae53692df00778e54 100644 --- a/internal/gitaly/storage/context.go +++ b/internal/gitaly/storage/context.go @@ -39,6 +39,11 @@ func ContextWithTransaction(ctx context.Context, tx Transaction) context.Context // ExtractTransaction extracts the transaction from the context. Nil is returned if there's // no transaction in the context. func ExtractTransaction(ctx context.Context) Transaction { + + if ctx == nil { + return nil + } + value := ctx.Value(keyTransaction{}) if value == nil { return nil diff --git a/internal/gitaly/storage/locator.go b/internal/gitaly/storage/locator.go index 9be1ae0f6a6d32c42b83544ccbf83f20eb77d2e3..ae7c4afdda5d7d87c8d2ce3d47ba59a89e7f47d2 100644 --- a/internal/gitaly/storage/locator.go +++ b/internal/gitaly/storage/locator.go @@ -92,7 +92,7 @@ type Locator interface { // CacheDir returns the path to the cache dir for a storage. CacheDir(storageName string) (string, error) // TempDir returns the path to the temp dir for a storage. - TempDir(storageName string) (string, error) + TempDir(ctx context.Context, storageName string) (string, error) // StateDir returns the path to the state dir for a storage. StateDir(storageName string) (string, error) // PartitionsDir returns the path to the partitions dir for a storage. diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go index 0cd3ca080f70d1acb5d30c89e6a3282be54e038c..69dce18d1dee4f9db39fa9266bc0c399871c23e8 100644 --- a/internal/gitaly/storage/storagemgr/middleware_test.go +++ b/internal/gitaly/storage/storagemgr/middleware_test.go @@ -351,8 +351,10 @@ messages and behavior by erroring out the requests before they even hit this int }, assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) { expected := validAdditionalRepository() - // The additional repository's relative path should have been rewritten. - require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + //// The additional repository's relative path should have been rewritten. + //require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + + require.Equal(t, expected.GetRelativePath(), actual.GetRelativePath()) // But the restored non-snapshotted repository should match the original. testhelper.ProtoEqual(t, expected, storage.ExtractTransaction(ctx).OriginalRepository(actual)) }, @@ -384,8 +386,9 @@ messages and behavior by erroring out the requests before they even hit this int }, assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) { expected := validAdditionalRepository() - // The additional repository's relative path should have been rewritten. - require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + //// The additional repository's relative path should have been rewritten. + //require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + require.Equal(t, expected.GetRelativePath(), actual.GetRelativePath()) // But the restored non-snapshotted repository should match the original. testhelper.ProtoEqual(t, expected, storage.ExtractTransaction(ctx).OriginalRepository(actual)) }, @@ -535,8 +538,13 @@ messages and behavior by erroring out the requests before they even hit this int expectedRepo := validRepository() actualRepo := repo - // When run in a transaction, the relative path will be pointed to the snapshot. - assert.NotEqual(t, expectedRepo.GetRelativePath(), repo.GetRelativePath()) + // When run in a transaction, the relative path will be the same, but the + // transaction should be included in the ctx + tx := storage.ExtractTransaction(ctx) + assert.NotNil(t, tx, "transaction not in the context") + assert.Equal(t, expectedRepo.GetRelativePath(), repo.GetRelativePath(), + "relative path should be the same even when in transaction") + expectedRepo.RelativePath = "" actualRepo.RelativePath = "" diff --git a/internal/gitaly/storage/storagemgr/partition/apply_operations.go b/internal/gitaly/storage/storagemgr/partition/apply_operations.go index a531f804c51e554f781e2750efdd7adf6144efb0..7e498c588b6a0733fbad7b1e62e69412708766f5 100644 --- a/internal/gitaly/storage/storagemgr/partition/apply_operations.go +++ b/internal/gitaly/storage/storagemgr/partition/apply_operations.go @@ -60,6 +60,18 @@ func applyOperations(ctx context.Context, sync func(context.Context, string) err path := string(op.GetPath()) if err := os.Remove(filepath.Join(storageRoot, path)); err != nil && !errors.Is(err, fs.ErrNotExist) { + //entries, err := os.ReadDir(filepath.Join(storageRoot, path)) + //var entriesString []string + //fmt.Printf("Contents of directory '%s':\n", path) + //for _, entry := range entries { + // if entry.IsDir() { + // fmt.Printf("[DIR] %s\n", entry.Name()) + // } else { + // fmt.Printf("[FILE] %s\n", entry.Name()) + // } + // entriesString = append(entriesString, entry.Name()) + //} + //return fmt.Errorf("remove: dirs %v, %w", entriesString, err) return fmt.Errorf("remove: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go index b4aa3d6f7f613861a825f451692f33fcdd88723b..93c66163ef2411b1e6852c53cf786a01d1f39901 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go @@ -31,7 +31,7 @@ func NewLeftoverFileMigration(locator storage.Locator) Migration { Fn: func(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { // Use snapshotFilter to match entry paths that must be kept in the repo. snapshotFilter := snapshot.NewRegexSnapshotFilter() - storagePath, err := locator.GetStorageByName(ctx, storageName) + storagePath, err := locator.GetStorageByName(nil, storageName) if err != nil { return fmt.Errorf("resolve storage path: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go index 6004432011d9ae83fded257ae4034aae3d9fdd5e..0d4be0384f55e94fa67193a5441ff03ceeb880ad 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go @@ -6,7 +6,6 @@ import ( "maps" "os" "path/filepath" - "sync" "testing" "github.com/stretchr/testify/require" @@ -22,7 +21,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/transactiontest" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -35,15 +33,7 @@ func TestNewLeftoverFileMigration_WithOrWithoutFeatureFlag(t *testing.T) { } func testNewLeftoverFileMigration(t *testing.T, ctx context.Context) { - var mu sync.RWMutex t.Parallel() - cfg := testcfg.Build(t) - - var migrationPtr *[]migration.Migration - var migrations []migration.Migration - migrationPtr = &migrations - repoClient, socket := runGitalyServer(t, cfg, testserver.WithMigrations(migrationPtr)) - cfg.SocketPath = socket for _, tc := range []struct { desc string @@ -60,6 +50,13 @@ func testNewLeftoverFileMigration(t *testing.T, ctx context.Context) { t.Run(tc.desc, func(t *testing.T) { t.Parallel() + cfg := testcfg.Build(t) + + var migrationPtr *[]migration.Migration + var migrations []migration.Migration + migrationPtr = &migrations + repoClient, socket := runGitalyServer(t, cfg, testserver.WithMigrations(migrationPtr)) + cfg.SocketPath = socket poolSetup := createLeftoverMigrationRepo(t, ctx, cfg, true, "") repoSetup := createLeftoverMigrationRepo(t, ctx, cfg, false, poolSetup.repoPath) @@ -93,22 +90,12 @@ func testNewLeftoverFileMigration(t *testing.T, ctx context.Context) { } // Avoid racing if other test also change the migration slice. - mu.Lock() migrations = []migration.Migration{migration.NewLeftoverFileMigration(config.NewLocator(cfg))} _, err = repoClient.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{ Repository: repoSetup.repo, }) - mu.Unlock() - require.NoError(t, err) - - // Force WAL sync to ensure migration transaction effects are applied to disk - // before checking directory state. Migration commits to WAL but doesn't - // guarantee immediate filesystem application. - conn, err := client.New(ctx, cfg.SocketPath) require.NoError(t, err) - defer testhelper.MustClose(t, conn) - transactiontest.ForceWALSync(t, ctx, conn, repoSetup.repo) // Verify repo directory testhelper.RequireDirectoryState(t, repoSetup.repoPath, "", repoSetup.expectedRepoDirState) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager.go b/internal/gitaly/storage/storagemgr/partition/migration/manager.go index c5d1c0692ac13f66412c62196bab3740cc8e3ac2..d46698e54439b87505f83cb4f4364d0f2e84123d 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager.go @@ -160,6 +160,8 @@ func (m *migrationManager) performMigrations(ctx context.Context, opts storage.B if err != nil { return fmt.Errorf("begin migration update: %w", err) } + ctx = storage.ContextWithTransaction(ctx, txn) + defer func() { if returnedErr != nil { if err := txn.Rollback(ctx); err != nil { diff --git a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go index f692dba227677873b76a53db703f0a5397aa9333..7bcd5e15d24d0189ce7b9242be760f229eec80d7 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go @@ -29,6 +29,9 @@ func NewReferenceBackendMigration( ID: id, Name: name, Fn: func(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { + + ctx = storage.ContextWithTransaction(ctx, tx) + scopedFactory, err := localRepoFactory.ScopeByStorage(ctx, storageName) if err != nil { return fmt.Errorf("creating storage scoped factory: %w", err) diff --git a/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go b/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go index 8d1d9ca8fab4f0a7c10b0890fef27447712dadf0..da369c6c815f760f46d7ff324a62b7cfa2d1c500 100644 --- a/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go @@ -148,10 +148,10 @@ func TestManager(t *testing.T) { require.Equal(t, fs1.Root(), fs2.Root()) // Writing into shared snapshots is not allowed. - require.ErrorIs(t, os.WriteFile(filepath.Join(fs1.Root(), "some file"), nil, fs.ModePerm), os.ErrPermission) + require.ErrorIs(t, os.WriteFile(filepath.Join(fs1.Root(), "repositories", "some file"), nil, fs.ModePerm), os.ErrPermission) expectedDirectoryState := testhelper.DirectoryState{ - "/": {Mode: ModeReadOnlyDirectory}, + "/": {Mode: mode.Directory}, "/repositories": {Mode: ModeReadOnlyDirectory}, "/repositories/a": {Mode: ModeReadOnlyDirectory}, "/repositories/a/refs": {Mode: ModeReadOnlyDirectory}, @@ -185,7 +185,7 @@ func TestManager(t *testing.T) { require.Equal(t, fs1.Root(), fs2.Root()) expectedDirectoryState := testhelper.DirectoryState{ - "/": {Mode: ModeReadOnlyDirectory}, + "/": {Mode: mode.Directory}, "/repositories": {Mode: ModeReadOnlyDirectory}, "/repositories/a": {Mode: ModeReadOnlyDirectory}, "/repositories/a/refs": {Mode: ModeReadOnlyDirectory}, @@ -217,7 +217,7 @@ func TestManager(t *testing.T) { defer testhelper.MustClose(t, fs1) testhelper.RequireDirectoryState(t, fs1.Root(), "", testhelper.DirectoryState{ - "/": {Mode: ModeReadOnlyDirectory}, + "/": {Mode: mode.Directory}, "/pools": {Mode: ModeReadOnlyDirectory}, "/pools/b": {Mode: ModeReadOnlyDirectory}, "/pools/b/refs": {Mode: ModeReadOnlyDirectory}, diff --git a/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot.go b/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot.go index 0820577d2fbe15c21a62d7d520c21f1325bec84f..683c92caee0d5173e79cb973d8bbf5e2664d4fd2 100644 --- a/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot.go +++ b/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/gitstorage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" @@ -62,7 +63,7 @@ func (s *snapshot) RelativePath(relativePath string) string { func (s *snapshot) Close() error { if s.readOnly { // Make the directories writable again so we can remove the snapshot. - if err := s.setDirectoryMode(mode.Directory); err != nil { + if err := s.setDirectoryMode(mode.Directory, nil); err != nil { return fmt.Errorf("make writable: %w", err) } } @@ -75,8 +76,35 @@ func (s *snapshot) Close() error { } // setDirectoryMode walks the snapshot and sets each directory's mode to the given mode. -func (s *snapshot) setDirectoryMode(mode fs.FileMode) error { - return storage.SetDirectoryMode(s.root, mode) +func (s *snapshot) setDirectoryMode(mode fs.FileMode, paths []string) error { + //if len(paths) == 0 { + // return storage.SetDirectoryMode(s.root, mode) + //} + //for _, path := range paths { + // if err := storage.SetDirectoryMode(filepath.Join(s.root, path), mode); err != nil { + // return err + // } + //} + //return nil + + entries, err := os.ReadDir(s.root) + if err != nil { + return err + } + + if len(entries) == 0 { + return storage.SetDirectoryMode(s.root, mode) + } + + for _, entry := range entries { + if entry.Name() == config.GitalyDataPrefix { + continue + } + if err := storage.SetDirectoryMode(filepath.Join(s.root, entry.Name()), mode); err != nil { + return err + } + } + return nil } // newSnapshot creates a new file system snapshot of the given root directory. The snapshot is created by copying @@ -111,7 +139,7 @@ func newSnapshot(ctx context.Context, storageRoot, snapshotRoot string, relative if readOnly { // Now that we've finished creating the snapshot, change the directory permissions to read-only // to prevent writing in the snapshot. - if err := s.setDirectoryMode(ModeReadOnlyDirectory); err != nil { + if err := s.setDirectoryMode(ModeReadOnlyDirectory, relativePaths); err != nil { return nil, fmt.Errorf("make read-only: %w", err) } } diff --git a/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot_filter_test.go b/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot_filter_test.go index 134df086a6461f5710190fb2facd6b095c0d24ff..f1eb6ecbd1071a6c4ba3b26748741ef334a1ad14 100644 --- a/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot_filter_test.go +++ b/internal/gitaly/storage/storagemgr/partition/snapshot/snapshot_filter_test.go @@ -238,7 +238,7 @@ func getExpectedDirectoryStateAfterSnapshotFilter(ctx context.Context, isExclusi stateMustStay := testhelper.DirectoryState{ // The snapshotting process does not use the existing permissions for // directories in the hierarchy before the repository directories. - "/": {Mode: dirMode()}, + "/": {Mode: mode.Directory}, "/repositories": {Mode: dirMode()}, "/pools": {Mode: dirMode()}, diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 32343607795b362ff8c6fb18c5ada433e689f0a0..210ac556051df038ca13589ac711e9396cc9ae24 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -1399,7 +1399,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } if step.UpdateGitConfig != nil { - updateGitConfig(t, ctx, rewrittenRepo, step.UpdateGitConfig, transaction) + updateGitConfig(t, storage.ContextWithTransaction(ctx, transaction), rewrittenRepo, step.UpdateGitConfig, transaction) } if step.QuarantinedPacks != nil { @@ -1415,17 +1415,17 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } for _, pack := range step.QuarantinedPacks { - require.NoError(t, rewrittenRepo.UnpackObjects(ctx, bytes.NewReader(pack))) + require.NoError(t, rewrittenRepo.UnpackObjects(storage.ContextWithTransaction(ctx, transaction), bytes.NewReader(pack))) } } if step.ReferenceUpdates != nil { - require.NoError(t, performReferenceUpdates(t, ctx, transaction, rewrittenRepo, step.ReferenceUpdates)) + require.NoError(t, performReferenceUpdates(t, storage.ContextWithTransaction(ctx, transaction), transaction, rewrittenRepo, step.ReferenceUpdates)) } if step.DefaultBranchUpdate != nil { require.NoError(t, rewrittenRepo.SetDefaultBranch(storage.ContextWithTransaction(ctx, transaction), nil, step.DefaultBranchUpdate.Reference)) - require.NoError(t, transaction.UpdateReferences(ctx, map[git.ReferenceName]git.ReferenceUpdate{ + require.NoError(t, transaction.UpdateReferences(storage.ContextWithTransaction(ctx, transaction), map[git.ReferenceName]git.ReferenceUpdate{ "HEAD": {NewTarget: step.DefaultBranchUpdate.Reference}, })) } @@ -1475,7 +1475,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.Contains(t, openTransactions, step.TransactionID, "test error: record initial reference value on transaction before beginning it") transaction := openTransactions[step.TransactionID] - require.NoError(t, transaction.RecordInitialReferenceValues(ctx, step.InitialValues)) + require.NoError(t, transaction.RecordInitialReferenceValues(storage.ContextWithTransaction(ctx, transaction), step.InitialValues)) case UpdateReferences: require.Contains(t, openTransactions, step.TransactionID, "test error: reference updates aborted on committed before beginning it") @@ -1488,7 +1488,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas }), ) - require.Equal(t, step.ExpectedError, performReferenceUpdates(t, ctx, transaction, rewrittenRepo, step.ReferenceUpdates)) + require.Equal(t, step.ExpectedError, performReferenceUpdates(t, storage.ContextWithTransaction(ctx, transaction), transaction, rewrittenRepo, step.ReferenceUpdates)) case ReadKey: require.Contains(t, openTransactions, step.TransactionID, "test error: read key called on transaction before beginning it") @@ -1575,25 +1575,25 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas repo := setup.RepositoryFactory.Build(repoProto) if step.DefaultBranch != "" { - require.NoError(t, repo.SetDefaultBranch(ctx, nil, step.DefaultBranch)) + require.NoError(t, repo.SetDefaultBranch(storage.ContextWithTransaction(ctx, transaction), nil, step.DefaultBranch)) } for _, pack := range step.Packs { - require.NoError(t, repo.UnpackObjects(ctx, bytes.NewReader(pack))) + require.NoError(t, repo.UnpackObjects(storage.ContextWithTransaction(ctx, transaction), bytes.NewReader(pack))) } for name, oid := range step.References { - require.NoError(t, repo.UpdateRef(ctx, name, oid, setup.ObjectHash.ZeroOID)) + require.NoError(t, repo.UpdateRef(storage.ContextWithTransaction(ctx, transaction), name, oid, setup.ObjectHash.ZeroOID)) } if step.CustomHooks != nil { require.NoError(t, - repoutil.SetCustomHooks(ctx, logger, config.NewLocator(setup.Config), nil, bytes.NewReader(step.CustomHooks), repo), + repoutil.SetCustomHooks2(storage.ContextWithTransaction(ctx, transaction), logger, config.NewLocator(setup.Config), nil, bytes.NewReader(step.CustomHooks), repo, true), ) } if step.Alternate != "" { - repoPath, err := repo.Path(ctx) + repoPath, err := repo.Path(storage.ContextWithTransaction(ctx, transaction)) require.NoError(t, err) alternatesPath := stats.AlternatesFilePath(repoPath) @@ -1636,7 +1636,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] - RequireRepositories(t, ctx, setup.Config, + RequireRepositories(t, storage.ContextWithTransaction(ctx, transaction), setup.Config, // Assert the contents of the transaction's snapshot. filepath.Join(setup.Config.Storages[0].Path, transaction.snapshot.Prefix()), // Rewrite all of the repositories to point to their snapshots. diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 4b6717850f47fa16eec08f0873974bb020c3933b..62632be06246bde6641922cfcd3d28e8f4de69e9 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -26,6 +26,7 @@ import ( housekeepingcfg "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/reftable" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" @@ -413,20 +414,23 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti txn.fs = fsrecorder.NewFS(txn.snapshot.Root(), txn.walEntry) if txn.repositoryTarget() { - txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.snapshot.RelativePath(txn.relativePath)) + //txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.snapshot.RelativePath(txn.relativePath)) + txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.relativePath) if err != nil { return nil, fmt.Errorf("does repository exist: %w", err) } - txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshot.RelativePath(txn.relativePath)) + //txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshot.RelativePath(txn.relativePath)) + txn.snapshotRepository = mgr.repositoryFactory.Build(txn.relativePath) if txn.write { if txn.repositoryExists { + ctxWithTxn := storage.ContextWithTransaction(ctx, txn) txn.quarantineDirectory = filepath.Join(txn.stagingDirectory, "quarantine") if err := os.MkdirAll(filepath.Join(txn.quarantineDirectory, "pack"), mode.Directory); err != nil { return nil, fmt.Errorf("create quarantine directory: %w", err) } - txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(ctx, txn.quarantineDirectory) + txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(ctxWithTxn, txn.quarantineDirectory) if err != nil { return nil, fmt.Errorf("quarantine: %w", err) } @@ -436,13 +440,13 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti return nil, fmt.Errorf("create reference recorder tmp dir: %w", err) } - refBackend, err := txn.snapshotRepository.ReferenceBackend(ctx) + refBackend, err := txn.snapshotRepository.ReferenceBackend(ctxWithTxn) if err != nil { return nil, fmt.Errorf("reference backend: %w", err) } if refBackend == git.ReferenceBackendFiles { - objectHash, err := txn.snapshotRepository.ObjectHash(ctx) + objectHash, err := txn.snapshotRepository.ObjectHash(ctxWithTxn) if err != nil { return nil, fmt.Errorf("object hash: %w", err) } @@ -453,7 +457,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti } if refBackend == git.ReferenceBackendReftables { - snapshotRepositoryPath, err := txn.snapshotRepository.Path(ctx) + snapshotRepositoryPath, err := txn.snapshotRepository.Path(ctxWithTxn) if err != nil { return nil, fmt.Errorf("snapshot repository path: %w", err) } @@ -516,7 +520,8 @@ func (txn *Transaction) PartitionRelativePaths() []string { // the repository in the transaction's snapshot. func (txn *Transaction) RewriteRepository(repo *gitalypb.Repository) *gitalypb.Repository { rewritten := proto.Clone(repo).(*gitalypb.Repository) - rewritten.RelativePath = txn.snapshot.RelativePath(repo.GetRelativePath()) + //rewritten.RelativePath = txn.snapshot.RelativePath(repo.GetRelativePath()) + //rewritten.RelativePath = repo.GetRelativePath() if repo.GetRelativePath() == txn.relativePath { rewritten.GitObjectDirectory = txn.snapshotRepository.GetGitObjectDirectory() @@ -1078,6 +1083,7 @@ type resultChannel chan commitResult // commit queues the transaction for processing and returns once the result has been determined. func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) (storage.LSN, error) { span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.Commit", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() transaction.result = make(resultChannel, 1) @@ -1229,6 +1235,7 @@ func (txn *Transaction) referenceUpdatesToProto() []*gitalypb.LogEntry_Reference // complete state and stages it into the transaction for committing. func (mgr *TransactionManager) stageRepositoryCreation(ctx context.Context, transaction *Transaction) error { span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.stageRepositoryCreation", nil) + //ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() objectHash, err := transaction.snapshotRepository.ObjectHash(ctx) @@ -1264,23 +1271,29 @@ func (mgr *TransactionManager) stageRepositoryCreation(ctx context.Context, tran // setupStagingRepository sets up a snapshot that is used for verifying and staging changes. It contains up to // date state of the partition. It does not have the quarantine configured. -func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, transaction *Transaction) (*localrepo.Repo, error) { +func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, transaction *Transaction) (context.Context, *localrepo.Repo, error) { defer trace.StartRegion(ctx, "setupStagingRepository").End() span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.setupStagingRepository", nil) defer span.Finish() if transaction.stagingSnapshot != nil { - return nil, errors.New("staging snapshot already setup") + return ctx, nil, errors.New("staging snapshot already setup") } var err error transaction.stagingSnapshot, err = mgr.snapshotManager.GetSnapshot(ctx, []string{transaction.relativePath}, true) if err != nil { - return nil, fmt.Errorf("new snapshot: %w", err) + return ctx, nil, fmt.Errorf("new snapshot: %w", err) } - return mgr.repositoryFactory.Build(transaction.stagingSnapshot.RelativePath(transaction.relativePath)), nil + // a wrapped ctx that point to staging root + stagingTxn := &Transaction{ + fs: fsrecorder.NewFS(transaction.stagingSnapshot.Root(), nil), + } + stagingCtx := storage.ContextWithTransaction(ctx, stagingTxn) + + return stagingCtx, mgr.repositoryFactory.Build(transaction.relativePath), nil } // packPrefixRegexp matches the output of `git index-pack` where it @@ -1307,7 +1320,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra return nil } - if _, err := os.Stat(mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath())); err != nil { + if _, err := os.Stat(mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath())); err != nil { if !errors.Is(err, fs.ErrNotExist) { return fmt.Errorf("stat: %w", err) } @@ -1318,6 +1331,8 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra } span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.packObjects", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) + defer span.Finish() // We want to only pack the objects that are present in the quarantine as they are potentially @@ -1494,7 +1509,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra // why we don't track 'tables.list' operation here. func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, transaction *Transaction) error { runPackRefs := transaction.runHousekeeping.packRefs - repoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) + repoPath := mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath()) if err := allowReftableCompaction(repoPath); err != nil { return fmt.Errorf("allow reftable compaction: %w", err) @@ -1590,7 +1605,7 @@ func (mgr *TransactionManager) preparePackRefsFiles(ctx context.Context, transac // First walk to collect the list of loose refs. looseReferences := make(map[git.ReferenceName]struct{}) - repoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) + repoPath := mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath()) if err := filepath.WalkDir(filepath.Join(repoPath, "refs"), func(path string, entry fs.DirEntry, err error) error { if err != nil { return err @@ -1764,7 +1779,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned transaction.result <- func() commitResult { var zeroOID git.ObjectID if transaction.repositoryTarget() { - repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath) + repositoryExists, err := mgr.doesRepositoryExist(nil, transaction.relativePath) if err != nil { return commitResult{error: fmt.Errorf("does repository exist: %w", err)} } @@ -1801,7 +1816,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned if refBackend == git.ReferenceBackendReftables || transaction.runHousekeeping != nil { if refBackend == git.ReferenceBackendReftables { if err := transaction.reftableRecorder.stageTables(ctx, - mgr.getAbsolutePath(transaction.relativePath), + mgr.getAbsolutePath(ctx, transaction.relativePath), transaction, ); err != nil { return commitResult{error: fmt.Errorf("stage tables: %w", err)} @@ -1844,6 +1859,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned // Prepare the transaction to conflict check it. We'll commit it later if we // succeed logging the transaction. mgr.mutex.Lock() + //conflictMgrCtx := storage.ContextWithTransaction(ctx, transaction) preparedTX, err := mgr.conflictMgr.Prepare(ctx, &conflict.Transaction{ ReadLSN: transaction.SnapshotLSN(), TargetRelativePath: transaction.relativePath, @@ -1956,6 +1972,18 @@ func (mgr *TransactionManager) verifyFileSystemOperations(ctx context.Context, t } if err := fsTX.Remove(path); err != nil { + //entries, err := os.ReadDir(path) + //var entriesString []string + //fmt.Printf("Contents of directory '%s':\n", path) + //for _, entry := range entries { + // if entry.IsDir() { + // fmt.Printf("[DIR] %s\n", entry.Name()) + // } else { + // fmt.Printf("[FILE] %s\n", entry.Name()) + // } + // entriesString = append(entriesString, entry.Name()) + //} + //return nil, fmt.Errorf("remove: dirs %v, %w", entriesString, err) return nil, fmt.Errorf("remove: %w", err) } } @@ -2103,7 +2131,7 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relativePath string) (bool, error) { defer trace.StartRegion(ctx, "doesRepositoryExist").End() - stat, err := os.Stat(mgr.getAbsolutePath(relativePath)) + stat, err := os.Stat(mgr.getAbsolutePath(ctx, relativePath)) if err != nil { if errors.Is(err, fs.ErrNotExist) { return false, nil @@ -2119,8 +2147,15 @@ func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relative return true, nil } -// getAbsolutePath returns the relative path's absolute path in the storage. -func (mgr *TransactionManager) getAbsolutePath(relativePath ...string) string { +// getAbsolutePath returns the relative path's absolute path in the storage. when ctx is nil, you get the +// original repo abs path, otherwise you get snapshot repo abs path +func (mgr *TransactionManager) getAbsolutePath(ctx context.Context, relativePath ...string) string { + if ctx != nil { + if txn := storage.ExtractTransaction(ctx); txn != nil { + return filepath.Join(append([]string{txn.FS().Root()}, relativePath...)...) + } + } + return filepath.Join(append([]string{mgr.storagePath}, relativePath...)...) } @@ -2129,6 +2164,196 @@ func packFilePath(walFiles string) string { return filepath.Join(walFiles, "transaction.pack") } +// verifyReferences verifies that the references in the transaction apply on top of the already accepted +// reference changes. The old tips in the transaction are verified against the current actual tips. +// It returns the write-ahead log entry for the reference transactions successfully verified. +func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction *Transaction) error { + defer trace.StartRegion(ctx, "verifyReferences").End() + + if len(transaction.referenceUpdates) == 0 { + return nil + } + + span, _ := tracing.StartSpanIfHasParent(ctx, "transaction.verifyReferences", nil) + defer span.Finish() + + stagingCtx, stagingRepository, err := mgr.setupStagingRepository(ctx, transaction) + if err != nil { + return fmt.Errorf("setup staging snapshot: %w", err) + } + + // Apply quarantine to the staging repository in order to ensure the new objects are available when we + // are verifying references. Without it we'd encounter errors about missing objects as the new objects + // are not in the repository. + stagingRepositoryWithQuarantine, err := stagingRepository.Quarantine(stagingCtx, transaction.quarantineDirectory) + if err != nil { + return fmt.Errorf("quarantine: %w", err) + } + + if err := mgr.verifyReferencesWithGitForReftables(stagingCtx, transaction.manifest.GetReferenceTransactions(), transaction, stagingRepositoryWithQuarantine); err != nil { + return fmt.Errorf("verify references with git: %w", err) + } + + return nil +} + +// verifyReferencesWithGitForReftables is responsible for converting the logical reference updates +// to transaction operations. +// +// To ensure that we don't modify existing tables and autocompact, we lock the existing tables +// before applying the updates. This way the reftable backend will only create new tables +func (mgr *TransactionManager) verifyReferencesWithGitForReftables( + ctx context.Context, + referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, + tx *Transaction, + repo *localrepo.Repo, +) error { + reftablePath := mgr.getAbsolutePath(ctx, repo.GetRelativePath(), "reftable/") + existingTables := make(map[string]struct{}) + lockedTables := make(map[string]struct{}) + + // reftableWalker allows us to walk the reftable directory. + reftableWalker := func(handler func(path string) error) fs.WalkDirFunc { + return func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + if filepath.Base(path) == "reftable" { + return nil + } + + return fmt.Errorf("unexpected directory: %s", filepath.Base(path)) + } + + return handler(path) + } + } + + // We first track the existing tables in the reftable directory. + if err := filepath.WalkDir( + reftablePath, + reftableWalker(func(path string) error { + if filepath.Base(path) == "tables.list" { + return nil + } + + existingTables[path] = struct{}{} + + return nil + }), + ); err != nil { + return fmt.Errorf("finding reftables: %w", err) + } + + // We then lock existing tables as to disable the autocompaction. + for table := range existingTables { + lockedPath := table + ".lock" + + f, err := os.Create(lockedPath) + if err != nil { + return fmt.Errorf("creating reftable lock: %w", err) + } + if err = f.Close(); err != nil { + return fmt.Errorf("closing reftable lock: %w", err) + } + + lockedTables[lockedPath] = struct{}{} + } + + // Since autocompaction is now disabled, adding references will + // add new tables but not compact them. + for _, referenceTransaction := range referenceTransactions { + if err := mgr.applyReferenceTransaction(ctx, referenceTransaction.GetChanges(), repo); err != nil { + return fmt.Errorf("applying reference: %w", err) + } + } + + // With this, we can track the new tables added along with the 'tables.list' + // as operations on the transaction. + if err := filepath.WalkDir( + reftablePath, + reftableWalker(func(path string) error { + if _, ok := lockedTables[path]; ok { + return nil + } + + if _, ok := existingTables[path]; ok { + return nil + } + + base := filepath.Base(path) + + if base == "tables.list" { + tx.walEntry.RemoveDirectoryEntry(filepath.Join(tx.relativePath, "reftable", base)) + } + return tx.walEntry.CreateFile(path, filepath.Join(tx.relativePath, "reftable", base)) + }), + ); err != nil { + return fmt.Errorf("finding reftables: %w", err) + } + + // Finally release the locked tables. + for lockedTable := range lockedTables { + if err := os.Remove(lockedTable); err != nil { + return fmt.Errorf("deleting locked file: %w", err) + } + } + + return nil +} + +// applyReferenceTransaction applies a reference transaction with `git update-ref`. +func (mgr *TransactionManager) applyReferenceTransaction(ctx context.Context, changes []*gitalypb.LogEntry_ReferenceTransaction_Change, repository *localrepo.Repo) (returnedErr error) { + defer trace.StartRegion(ctx, "applyReferenceTransaction").End() + + updater, err := updateref.New(ctx, repository, updateref.WithDisabledTransactions(), updateref.WithNoDeref()) + if err != nil { + return fmt.Errorf("new: %w", err) + } + defer func() { + if err := updater.Close(); err != nil { + returnedErr = errors.Join(returnedErr, fmt.Errorf("close updater: %w", err)) + } + }() + + if err := updater.Start(); err != nil { + return fmt.Errorf("start: %w", err) + } + + version, err := repository.GitVersion(ctx) + if err != nil { + return fmt.Errorf("git version: %w", err) + } + + for _, change := range changes { + if len(change.GetNewTarget()) > 0 { + if err := updater.UpdateSymbolicReference( + version, + git.ReferenceName(change.GetReferenceName()), + git.ReferenceName(change.GetNewTarget()), + ); err != nil { + return fmt.Errorf("update symref %q: %w", change.GetReferenceName(), err) + } + } else { + if err := updater.Update(git.ReferenceName(change.GetReferenceName()), git.ObjectID(change.GetNewOid()), ""); err != nil { + return fmt.Errorf("update %q: %w", change.GetReferenceName(), err) + } + } + } + + if err := updater.Prepare(); err != nil { + return fmt.Errorf("prepare: %w", err) + } + + if err := updater.Commit(); err != nil { + return fmt.Errorf("commit: %w", err) + } + + return nil +} + // appendLogEntry appends a log entry of a transaction to the write-ahead log. After the log entry is appended to WAL, // the corresponding snapshot lock and in-memory reference for the latest appended LSN is created. func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go index 5bdd2debb2dded457a03187224aacbf16ef50559..cf156e9c1b1522a44bada2d7dff362e41a980924 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go @@ -2,6 +2,9 @@ package partition import ( "bytes" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/fshistory" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "path/filepath" "testing" @@ -10,9 +13,6 @@ import ( housekeepingcfg "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/fshistory" - "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) func generateAlternateTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go index e607bb8d7d655711ec4f9623396a3d05060f2757..b1494c12e9a315ed61203ed159564443a265d4f0 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go @@ -122,6 +122,7 @@ func (mgr *TransactionManager) prepareHousekeeping(ctx context.Context, transact } span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.prepareHousekeeping", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() finishTimer := mgr.metrics.housekeeping.ReportTaskLatency("total", "prepare") @@ -214,8 +215,8 @@ func (mgr *TransactionManager) prepareRepacking(ctx context.Context, transaction // Build a working repository pointing to snapshot repository. Housekeeping task can access the repository // without the needs for quarantine. - workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)) - repoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) + repoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) isFullRepack, err := housekeeping.ValidateRepacking(repack.config) if err != nil { @@ -422,7 +423,7 @@ func (mgr *TransactionManager) prepareCommitGraphs(ctx context.Context, transact } if err := housekeeping.WriteCommitGraph(ctx, - mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)), + mgr.repositoryFactory.Build(transaction.relativePath), transaction.runHousekeeping.writeCommitGraphs.config, ); err != nil { return fmt.Errorf("re-writing commit graph: %w", err) @@ -471,6 +472,7 @@ func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transacti defer trace.StartRegion(ctx, "verifyHousekeeping").End() span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.verifyHousekeeping", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() finishTimer := mgr.metrics.housekeeping.ReportTaskLatency("total", "verify") @@ -589,7 +591,7 @@ func (mgr *TransactionManager) verifyRepacking(ctx context.Context, transaction // Setup a working repository of the destination repository and all changes of current transactions. All // concurrent changes must land in that repository already. - stagingRepository, err := mgr.setupStagingRepository(ctx, transaction) + stagingCtx, stagingRepository, err := mgr.setupStagingRepository(ctx, transaction) if err != nil { return fmt.Errorf("setting up new snapshot for verifying repacking: %w", err) } @@ -626,7 +628,7 @@ func (mgr *TransactionManager) verifyRepacking(ctx context.Context, transaction return fmt.Errorf("walking committed entries: %w", err) } - if err := mgr.verifyObjectsExist(ctx, stagingRepository, objectDependencies); err != nil { + if err := mgr.verifyObjectsExist(stagingCtx, stagingRepository, objectDependencies); err != nil { var errInvalidObject localrepo.InvalidObjectError if errors.As(err, &errInvalidObject) { return errRepackConflictPrunedObject @@ -653,13 +655,13 @@ func (mgr *TransactionManager) verifyPackRefsReftable(transaction *Transaction) // repository before the compaction. However, concurrent writes might have occurred which // wrote new tables to the target repository. We shouldn't loose that data. So we merge // the compacted tables.list with the newer tables from the target repository's tables.list. - repoPath := mgr.getAbsolutePath(transaction.relativePath) + repoPath := mgr.getAbsolutePath(nil, transaction.relativePath) newTableList, err := reftable.ReadTablesList(repoPath) if err != nil { return nil, fmt.Errorf("reading tables.list: %w", err) } - snapshotRepoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) + snapshotRepoPath := mgr.getAbsolutePath(storage.ContextWithTransaction(context.Background(), transaction), transaction.snapshotRepository.GetRelativePath()) // tables.list is hard-linked from the repository to the snapshot, we shouldn't // directly write to it as we'd modify the original. So let's remove the @@ -726,6 +728,7 @@ func (mgr *TransactionManager) verifyPackRefsReftable(transaction *Transaction) // parsing and regenerating the packed-refs file. So, let's settle down with a conflict error at this point. func (mgr *TransactionManager) verifyPackRefsFiles(ctx context.Context, transaction *Transaction, zeroOID git.ObjectID) (*gitalypb.LogEntry_Housekeeping_PackRefs, error) { packRefs := transaction.runHousekeeping.packRefs + ctx = storage.ContextWithTransaction(ctx, transaction) // Check for any concurrent ref deletion between this transaction's snapshot LSN to the end. if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry, objectDependencies map[git.ObjectID]struct{}) error { @@ -788,7 +791,7 @@ func (mgr *TransactionManager) verifyPackRefsFiles(ctx context.Context, transact if isDir { // If this is a directory, we need to ensure it is actually empty before removing // it. Check if we find any directory entries we haven't yet deleted. - entries, err := os.ReadDir(mgr.getAbsolutePath(relativePath)) + entries, err := os.ReadDir(mgr.getAbsolutePath(nil, relativePath)) if err != nil { return fmt.Errorf("read dir: %w", err) } @@ -843,10 +846,10 @@ func (mgr *TransactionManager) prepareOffloading(ctx context.Context, transactio // Loading configurations for offloading cfg := transaction.runHousekeeping.runOffloading.config - workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)) + workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) // workingRepoPath is the current repository path which we are performing operations on. // In the context of transaction, workingRepoPath is a snapshot repository. - workingRepoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + workingRepoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) // Find the original repository's absolute path. In the context of transaction, originalRepo is the repo // which we are taking a snapshot of. originalRepo := &gitalypb.Repository{ @@ -979,10 +982,10 @@ func (mgr *TransactionManager) prepareRehydrating(ctx context.Context, transacti span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.prepareRehydrating", nil) defer span.Finish() - workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)) + workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) // workingRepoPath is the current repository path which we are performing operations on. // In the context of transaction, workingRepoPath is a snapshot repository. - workingRepoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + workingRepoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) prefix := transaction.runHousekeeping.runRehydrating.prefix packFilesToDownload, err := mgr.offloadingSink.List(ctx, prefix) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go index 70241593c8c055e3865e8df5d3047d2fc8cd8054..05301164c446059468e53100ffe044d5765964d5 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go @@ -1,9 +1,6 @@ package partition import ( - "path/filepath" - "testing" - "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" @@ -12,6 +9,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/fshistory" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "path/filepath" + "testing" ) func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index e1a99c5bf22b5d3ce9a9e211152918e61cbea505..d08f6504b0056e9e4bed87fb589241b2b2bec210 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -372,8 +372,9 @@ func TestTransactionManager(t *testing.T) { "Housekeeping/CommitGraphs": generateHousekeepingCommitGraphsTests(t, ctx, setup), "Consumer": generateConsumerTests(t, setup), "KeyValue": generateKeyValueTests(t, setup), - "Offloading": generateOffloadingTests(t, ctx, testPartitionID, relativePath), - "Rehydrating": generateRehydratingTests(t, ctx, testPartitionID, relativePath), + + "Offloading": generateOffloadingTests(t, ctx, testPartitionID, relativePath), + "Rehydrating": generateRehydratingTests(t, ctx, testPartitionID, relativePath), } for desc, tests := range subTests { @@ -507,8 +508,8 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio desc: "commit returns if transaction processing stops before transaction acceptance", skip: func(t *testing.T) { testhelper.SkipWithRaft(t, `The hook is installed before appending log entry, before - recorder is activated. Hence, it's not feasible to differentiate between normal - entries and Raft internal entries`) + recorder is activated. Hence, it's not feasible to differentiate between normal + entries and Raft internal entries`) }, steps: steps{ StartManager{ diff --git a/internal/tempdir/clean.go b/internal/tempdir/clean.go index cdd957fdf351092a8f1f9472bd817b7f9a9ce654..7d40b30d57b471db8b63eb6254a999031eacfdec 100644 --- a/internal/tempdir/clean.go +++ b/internal/tempdir/clean.go @@ -63,7 +63,7 @@ func clean(logger log.Logger, locator storage.Locator, storage config.Storage) e ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dir, err := locator.TempDir(storage.Name) + dir, err := locator.TempDir(ctx, storage.Name) if err != nil { return fmt.Errorf("temporary dir: %w", err) } diff --git a/internal/tempdir/clean_test.go b/internal/tempdir/clean_test.go index a608ad8fe00caece0ccceb2908064169123f8c53..fb66416fce7c5563fa6d8e015dbc7c40ecc8a896 100644 --- a/internal/tempdir/clean_test.go +++ b/internal/tempdir/clean_test.go @@ -1,6 +1,7 @@ package tempdir import ( + "context" "os" "path/filepath" "testing" @@ -19,7 +20,7 @@ func TestCleanSuccess(t *testing.T) { cfg := testcfg.Build(t) locator := config.NewLocator(cfg) - cleanRoot, err := locator.TempDir(cfg.Storages[0].Name) + cleanRoot, err := locator.TempDir(nil, cfg.Storages[0].Name) require.NoError(t, err) require.NoError(t, os.MkdirAll(cleanRoot, mode.Directory), "create clean root before setup") @@ -90,7 +91,7 @@ type mockLocator struct { storage.Locator } -func (m mockLocator) TempDir(storageName string) (string, error) { +func (m mockLocator) TempDir(ctx context.Context, storageName string) (string, error) { return "something", nil } @@ -129,19 +130,19 @@ func TestDedupStorages(t *testing.T) { } func chmod(t *testing.T, locator storage.Locator, storage config.Storage, p string, mode os.FileMode) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) require.NoError(t, os.Chmod(filepath.Join(root, p), mode)) } func chtimes(t *testing.T, locator storage.Locator, storage config.Storage, p string, date time.Time) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) require.NoError(t, os.Chtimes(filepath.Join(root, p), date, date)) } func assertEntries(t *testing.T, locator storage.Locator, storage config.Storage, entries ...string) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) foundEntries, err := os.ReadDir(root) @@ -155,7 +156,7 @@ func assertEntries(t *testing.T, locator storage.Locator, storage config.Storage } func makeFile(t *testing.T, locator storage.Locator, storage config.Storage, filePath string, mtime time.Time) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) fullPath := filepath.Join(root, filePath) @@ -164,7 +165,7 @@ func makeFile(t *testing.T, locator storage.Locator, storage config.Storage, fil } func makeDir(t *testing.T, locator storage.Locator, storage config.Storage, dirPath string, mtime time.Time) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(nil, storage.Name) require.NoError(t, err) fullPath := filepath.Join(root, dirPath) diff --git a/internal/tempdir/tempdir.go b/internal/tempdir/tempdir.go index f70c1216ce0aa83c362b12318ebb1aa166530d8c..6c288d3ba7d1827010fd407d3861c701a8190895 100644 --- a/internal/tempdir/tempdir.go +++ b/internal/tempdir/tempdir.go @@ -76,7 +76,7 @@ func NewRepository(ctx context.Context, storageName string, logger log.Logger, l } func newDirectory(ctx context.Context, storageName string, prefix string, logger log.Logger, loc storage.Locator) (Dir, error) { - root, err := loc.TempDir(storageName) + root, err := loc.TempDir(ctx, storageName) if err != nil { return Dir{}, fmt.Errorf("temp directory: %w", err) }