From 987647a7d357b129cb4d552fdec2817be44783ae Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Mon, 18 Aug 2025 15:38:59 -0300 Subject: [PATCH 1/6] migration: Fix flaky leftover migration test Sharing an array of migrations between test cases caused race conditions. This commit fixes the issue by isolating the migrations array within each test case. --- .../migration/leftover_file_migration_test.go | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go index 6004432011d..3e05de7aff1 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration_test.go @@ -6,7 +6,6 @@ import ( "maps" "os" "path/filepath" - "sync" "testing" "github.com/stretchr/testify/require" @@ -35,15 +34,7 @@ func TestNewLeftoverFileMigration_WithOrWithoutFeatureFlag(t *testing.T) { } func testNewLeftoverFileMigration(t *testing.T, ctx context.Context) { - var mu sync.RWMutex t.Parallel() - cfg := testcfg.Build(t) - - var migrationPtr *[]migration.Migration - var migrations []migration.Migration - migrationPtr = &migrations - repoClient, socket := runGitalyServer(t, cfg, testserver.WithMigrations(migrationPtr)) - cfg.SocketPath = socket for _, tc := range []struct { desc string @@ -60,6 +51,14 @@ func testNewLeftoverFileMigration(t *testing.T, ctx context.Context) { t.Run(tc.desc, func(t *testing.T) { t.Parallel() + cfg := testcfg.Build(t) + + var migrationPtr *[]migration.Migration + var migrations []migration.Migration + migrationPtr = &migrations + repoClient, socket := runGitalyServer(t, cfg, testserver.WithMigrations(migrationPtr)) + cfg.SocketPath = socket + poolSetup := createLeftoverMigrationRepo(t, ctx, cfg, true, "") repoSetup := createLeftoverMigrationRepo(t, ctx, cfg, false, poolSetup.repoPath) @@ -92,14 +91,11 @@ func testNewLeftoverFileMigration(t *testing.T, ctx context.Context) { } } - // Avoid racing if other test also change the migration slice. - mu.Lock() migrations = []migration.Migration{migration.NewLeftoverFileMigration(config.NewLocator(cfg))} _, err = repoClient.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{ Repository: repoSetup.repo, }) - mu.Unlock() require.NoError(t, err) // Force WAL sync to ensure migration transaction effects are applied to disk -- GitLab From d1e4416ef3959cd538890b3d69f7e202304182b8 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Mon, 8 Sep 2025 20:37:46 -0400 Subject: [PATCH 2/6] catfile: Refactor catfile cache key to include storage path hash Previously, the cache key included only the repository's relative path. This worked because a snapshot repository's relative path was different from its original path. For example: - Original: `@hashed/6c/a0/6ca0xxxx.git` - Snapshot: `+gitaly/staging/zz/snapshots/1/@hashed/6c/a0/6ca0xxxx.git` Since the relative paths differed, their catfile cache keys were also different. After aligning the relative paths of original and snapshot repositories, we now need a way to differentiate their cache keys. This commit adds the storage path hash as a field in the cache key. For example, an original repository storage path is `gitaly-xxx/yyy/storages.d/default`, while a snapshot repository may reside under `gitaly-xxx/yyy/storages.d/default/+gitaly/staging/zz/snapshots/1`. --- internal/git/catfile/cache.go | 44 +++++++++++++++-------- internal/git/catfile/cache_test.go | 48 +++++++++++++++---------- internal/git/catfile/testhelper_test.go | 3 +- 3 files changed, 61 insertions(+), 34 deletions(-) diff --git a/internal/git/catfile/cache.go b/internal/git/catfile/cache.go index 59d57a5eead..f3b6f20196c 100644 --- a/internal/git/catfile/cache.go +++ b/internal/git/catfile/cache.go @@ -58,6 +58,7 @@ type ProcessCache struct { monitorTicker helper.Ticker monitorDone chan struct{} + locator storage.Locator objectReaders processes objectReadersWithoutMailmap processes @@ -70,16 +71,18 @@ type ProcessCache struct { // NewCache creates a new catfile process cache. func NewCache(cfg config.Cfg) *ProcessCache { - return newCache(defaultBatchfileTTL, cfg.Git.CatfileCacheSize, helper.NewTimerTicker(defaultEvictionInterval)) + return newCache(defaultBatchfileTTL, cfg.Git.CatfileCacheSize, helper.NewTimerTicker(defaultEvictionInterval), + config.NewLocator(cfg)) } -func newCache(ttl time.Duration, maxLen int, monitorTicker helper.Ticker) *ProcessCache { +func newCache(ttl time.Duration, maxLen int, monitorTicker helper.Ticker, locator storage.Locator) *ProcessCache { if maxLen <= 0 { maxLen = defaultMaxLen } processCache := &ProcessCache{ - ttl: ttl, + ttl: ttl, + locator: locator, objectReaders: processes{ maxLen: maxLen, }, @@ -241,7 +244,16 @@ func (c *ProcessCache) getOrCreateProcess( span, ctx := tracing.StartSpanIfHasParent(ctx, spanName, nil) defer span.Finish() - cacheKey, isCacheable := newCacheKey(fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())), repo) + // The storagePath is included in the cache key because + // snapshot repositories and their corresponding original repositories share the + // same relative path. To differentiate between them and ensure each has a unique + // cache key, we incorporate the storage path as part of the key. + storagePath, err := c.locator.GetStorageByName(ctx, repo.GetStorageName()) + if err != nil { + return nil, nil, fmt.Errorf("storage path: %w", err) + } + cacheKey, isCacheable := newCacheKey(fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())), + storagePath, repo) if isCacheable { // We only try to look up cached processes in case it is cacheable, which requires a @@ -363,24 +375,26 @@ func (c *ProcessCache) returnToCache(p *processes, cacheKey key, value cacheable } type key struct { - sessionID string - repoStorage string - repoRelPath string - repoObjDir string - repoAltDir string + sessionID string + repoStorageName string + repoStoragePath string + repoRelPath string + repoObjDir string + repoAltDir string } -func newCacheKey(sessionID string, repo storage.Repository) (key, bool) { +func newCacheKey(sessionID string, storagePath string, repo storage.Repository) (key, bool) { if sessionID == "" { return key{}, false } return key{ - sessionID: sessionID, - repoStorage: repo.GetStorageName(), - repoRelPath: repo.GetRelativePath(), - repoObjDir: repo.GetGitObjectDirectory(), - repoAltDir: strings.Join(repo.GetGitAlternateObjectDirectories(), ","), + sessionID: sessionID, + repoStorageName: repo.GetStorageName(), + repoStoragePath: storagePath, + repoRelPath: repo.GetRelativePath(), + repoObjDir: repo.GetGitObjectDirectory(), + repoAltDir: strings.Join(repo.GetGitAlternateObjectDirectories(), ","), }, true } diff --git a/internal/git/catfile/cache_test.go b/internal/git/catfile/cache_test.go index f893fa5c84e..9c2de1d709e 100644 --- a/internal/git/catfile/cache_test.go +++ b/internal/git/catfile/cache_test.go @@ -173,10 +173,12 @@ func TestCache_autoExpiry(t *testing.T) { monitorTicker := helper.NewManualTicker() - c := newCache(time.Hour, 10, monitorTicker) + cfg := testcfg.Build(t) + locator := config.NewLocator(cfg) + + c := newCache(time.Hour, 10, monitorTicker, locator) defer c.Stop() - cfg := testcfg.Build(t) repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ SkipCreationViaService: true, }) @@ -269,7 +271,10 @@ func TestCache_ObjectReader(t *testing.T) { repoExecutor := newRepoExecutor(t, cfg, repo) - cache := newCache(time.Hour, 10, helper.NewManualTicker()) + locator := config.NewLocator(cfg) + storagePath, err := locator.GetStorageByName(ctx, repo.GetStorageName()) + require.NoError(t, err) + cache := newCache(time.Hour, 10, helper.NewManualTicker(), locator) defer cache.Stop() t.Run("cached", func(t *testing.T) { @@ -286,9 +291,10 @@ func TestCache_ObjectReader(t *testing.T) { expectedSessionID := fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())) require.Equal(t, []key{{ - sessionID: expectedSessionID, - repoStorage: repo.GetStorageName(), - repoRelPath: repo.GetRelativePath(), + sessionID: expectedSessionID, + repoStorageName: repo.GetStorageName(), + repoRelPath: repo.GetRelativePath(), + repoStoragePath: storagePath, }}, allKeys) // Assert that we can still read from the cached process. @@ -346,8 +352,10 @@ func TestCache_ObjectReaderWithoutMailmap(t *testing.T) { gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) repoExecutor := newRepoExecutor(t, cfg, repo) - - cache := newCache(time.Hour, 10, helper.NewManualTicker()) + locator := config.NewLocator(cfg) + storagePath, err := locator.GetStorageByName(ctx, repo.GetStorageName()) + require.NoError(t, err) + cache := newCache(time.Hour, 10, helper.NewManualTicker(), locator) defer cache.Stop() t.Run("cached", func(t *testing.T) { @@ -365,9 +373,10 @@ func TestCache_ObjectReaderWithoutMailmap(t *testing.T) { expectedSessionID := fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())) require.Equal(t, []key{{ - sessionID: expectedSessionID, - repoStorage: repo.GetStorageName(), - repoRelPath: repo.GetRelativePath(), + sessionID: expectedSessionID, + repoStorageName: repo.GetStorageName(), + repoRelPath: repo.GetRelativePath(), + repoStoragePath: storagePath, }}, allKeys) // Assert that we can still read from the cached process. @@ -422,7 +431,7 @@ func TestCache_ObjectReader_quarantine(t *testing.T) { locator := config.NewLocator(cfg) logger := testhelper.NewLogger(t) - cache := newCache(time.Hour, 10, helper.NewManualTicker()) + cache := newCache(time.Hour, 10, helper.NewManualTicker(), locator) defer cache.Stop() t.Run("with active quarantine", func(t *testing.T) { @@ -450,12 +459,15 @@ func TestCache_ObjectReader_quarantine(t *testing.T) { expectedSessionID := fmt.Sprintf("%d", roundToNearestFiveMinute(time.Now())) + storagePath, err := locator.GetStorageByName(ctx, repo.GetStorageName()) + require.NoError(t, err) require.Equal(t, []key{{ - sessionID: expectedSessionID, - repoStorage: repo.GetStorageName(), - repoRelPath: repo.GetRelativePath(), - repoObjDir: quarantineRepo.GetGitObjectDirectory(), - repoAltDir: "objects", + sessionID: expectedSessionID, + repoStorageName: repo.GetStorageName(), + repoRelPath: repo.GetRelativePath(), + repoObjDir: quarantineRepo.GetGitObjectDirectory(), + repoAltDir: "objects", + repoStoragePath: storagePath, }}, allKeys) // Assert that we can still read from the cached process. @@ -488,7 +500,7 @@ func mustCreateCacheable(t *testing.T, cfg config.Cfg, repo storage.Repository) func mustCreateKey(t *testing.T, sessionID string, repo storage.Repository) key { t.Helper() - key, cacheable := newCacheKey(sessionID, repo) + key, cacheable := newCacheKey(sessionID, "some/path", repo) require.True(t, cacheable) return key diff --git a/internal/git/catfile/testhelper_test.go b/internal/git/catfile/testhelper_test.go index 11cf5d94dae..28a240e1e46 100644 --- a/internal/git/catfile/testhelper_test.go +++ b/internal/git/catfile/testhelper_test.go @@ -69,7 +69,8 @@ func setupObjectReader(t *testing.T, ctx context.Context) (config.Cfg, ObjectCon }) repoExecutor := newRepoExecutor(t, cfg, repo) - cache := newCache(1*time.Hour, 1000, helper.NewTimerTicker(defaultEvictionInterval)) + locator := config.NewLocator(cfg) + cache := newCache(1*time.Hour, 1000, helper.NewTimerTicker(defaultEvictionInterval), locator) t.Cleanup(cache.Stop) objectReader, cancel, err := cache.ObjectReader(ctx, repoExecutor) -- GitLab From e8db39fad94a05ac891a629291cde69f023dbb5f Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Mon, 18 Aug 2025 13:31:48 -0300 Subject: [PATCH 3/6] storage: Refactor TempDir to consider snapshot storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, temporary directories were created only under `/+gitaly/tmp`. This worked when original and snapshot repositories shared the same storage path but had different relative paths. Once that relative paths are aligned, original and snapshot repositories would have different storage paths. For example: - Original: `gitaly-x/y/storages.d/name` - Snapshot: `gitaly-x/y/storages.d/name/+gitaly/staging/z/snapshots/1` When working with a snapshot, continuing to generate temporary directories under `.../storages.d/default/tmp` means operating outside the snapshot scope. Instead, temporary directories must be created under the snapshot’s storage path to remain within its scope. This commit updates TempDir generation to add this logic. --- internal/cache/walker.go | 3 +- internal/git/localrepo/paths.go | 2 +- internal/git/localrepo/repo.go | 4 +-- internal/git/localrepo/repo_test.go | 4 +-- internal/git/objectpool/pool.go | 2 +- internal/git/quarantine/quarantine.go | 8 ++++- internal/gitaly/config/locator.go | 34 +++++++++++++------ internal/gitaly/config/locator_test.go | 7 ++-- internal/gitaly/linguist/language_stats.go | 7 ++-- internal/gitaly/repoutil/remove.go | 2 +- internal/gitaly/storage/locator.go | 2 +- .../partition/snapshot/manager_test.go | 2 +- internal/tempdir/clean.go | 2 +- internal/tempdir/clean_test.go | 16 +++++---- internal/tempdir/tempdir.go | 2 +- 15 files changed, 61 insertions(+), 36 deletions(-) diff --git a/internal/cache/walker.go b/internal/cache/walker.go index 9444540c438..4db46e24d12 100644 --- a/internal/cache/walker.go +++ b/internal/cache/walker.go @@ -6,6 +6,7 @@ package cache import ( + "context" "errors" "fmt" "io/fs" @@ -149,7 +150,7 @@ func (c *DiskCache) moveAndClear(storage config.Storage) error { logger := c.logger.WithField("storage", storage.Name) logger.Info("clearing disk cache object folder") - tempPath, err := c.locator.TempDir(storage.Name) + tempPath, err := c.locator.TempDir(context.Background(), storage.Name) if err != nil { return fmt.Errorf("temp dir: %w", err) } diff --git a/internal/git/localrepo/paths.go b/internal/git/localrepo/paths.go index e463334b736..aa15f70d4bd 100644 --- a/internal/git/localrepo/paths.go +++ b/internal/git/localrepo/paths.go @@ -59,7 +59,7 @@ func (repo *Repo) ObjectDirectoryPath(ctx context.Context) (string, error) { // have a repository-specific prefix which we must check in order to determine whether the // quarantine directory does in fact belong to the repo at hand. if _, origError := storage.ValidateRelativePath(repoPath, objectDirectoryPath); origError != nil { - tempDir, err := repo.locator.TempDir(repo.GetStorageName()) + tempDir, err := repo.locator.TempDir(ctx, repo.GetStorageName()) if err != nil { return "", structerr.NewInvalidArgument("getting storage's temporary directory: %w", err) } diff --git a/internal/git/localrepo/repo.go b/internal/git/localrepo/repo.go index 514c9eba8b5..2d19d8a9eb3 100644 --- a/internal/git/localrepo/repo.go +++ b/internal/git/localrepo/repo.go @@ -222,8 +222,8 @@ func errorWithStderr(err error, stderr []byte) error { // StorageTempDir returns the temporary dir for the storage where the repo is on. // When this directory does not exist yet, it's being created. -func (repo *Repo) StorageTempDir() (string, error) { - tempPath, err := repo.locator.TempDir(repo.GetStorageName()) +func (repo *Repo) StorageTempDir(ctx context.Context) (string, error) { + tempPath, err := repo.locator.TempDir(ctx, repo.GetStorageName()) if err != nil { return "", err } diff --git a/internal/git/localrepo/repo_test.go b/internal/git/localrepo/repo_test.go index 8cfa3f0120b..e442a7f7e75 100644 --- a/internal/git/localrepo/repo_test.go +++ b/internal/git/localrepo/repo_test.go @@ -260,11 +260,11 @@ func TestRepo_StorageTempDir(t *testing.T) { }) repo := New(testhelper.NewLogger(t), locator, gitCmdFactory, catfileCache, repoProto) - expected, err := locator.TempDir(cfg.Storages[0].Name) + expected, err := locator.TempDir(ctx, cfg.Storages[0].Name) require.NoError(t, err) require.NoDirExists(t, expected) - tempPath, err := repo.StorageTempDir() + tempPath, err := repo.StorageTempDir(ctx) require.NoError(t, err) require.DirExists(t, expected) require.Equal(t, expected, tempPath) diff --git a/internal/git/objectpool/pool.go b/internal/git/objectpool/pool.go index cd1d297ee8c..46e93acf140 100644 --- a/internal/git/objectpool/pool.go +++ b/internal/git/objectpool/pool.go @@ -64,7 +64,7 @@ func FromProto( // When creating repositories in the ObjectPool service we will first create the // repository in a temporary directory. So we need to check whether the path we see // here is in such a temporary directory and let it pass. - tempDir, err := locator.TempDir(proto.GetRepository().GetStorageName()) + tempDir, err := locator.TempDir(ctx, proto.GetRepository().GetStorageName()) if err != nil { return nil, fmt.Errorf("getting temporary storage directory: %w", err) } diff --git a/internal/git/quarantine/quarantine.go b/internal/git/quarantine/quarantine.go index ffe03515901..11df75e5900 100644 --- a/internal/git/quarantine/quarantine.go +++ b/internal/git/quarantine/quarantine.go @@ -40,7 +40,13 @@ func New(ctx context.Context, repo *gitalypb.Repository, logger log.Logger, loca return nil, nil, structerr.NewInternal("getting repo path: %w", err) } - quarantineDir, cleanup, err := tempdir.NewWithPrefix(ctx, repo.GetStorageName(), + // Use context.Background() so that the quarantineDir is always created in the + // root storage path (e.g., `some/path/storages.d/default/`), and not in the + // snapshot storage path (e.g., `some/path/storages.d/default/staging/snapshots/1`). + // The reason is that certain read-only endpoints, such as ListConflictFiles() and CommitDelta(), + // need to create a quarantine directory, but their snapshot storage path is read-only. + // Creating quarantineDir in the snapshot storage path will cause permission errors. + quarantineDir, cleanup, err := tempdir.NewWithPrefix(context.Background(), repo.GetStorageName(), storage.QuarantineDirectoryPrefix(repo), logger, locator) if err != nil { return nil, nil, fmt.Errorf("creating quarantine: %w", err) diff --git a/internal/gitaly/config/locator.go b/internal/gitaly/config/locator.go index bc6bc2740e6..2965e6d58b3 100644 --- a/internal/gitaly/config/locator.go +++ b/internal/gitaly/config/locator.go @@ -165,30 +165,42 @@ func (l *configLocator) GetStorageByName(ctx context.Context, storageName string // CacheDir returns the path to the cache dir for a storage. func (l *configLocator) CacheDir(storageName string) (string, error) { - return l.getPath(storageName, cachePrefix) + return l.getPath(context.Background(), storageName, cachePrefix) } // StateDir returns the path to the state dir for a storage. func (l *configLocator) StateDir(storageName string) (string, error) { - return l.getPath(storageName, statePrefix) + return l.getPath(context.Background(), storageName, statePrefix) } // TempDir returns the path to the temp dir for a storage. -func (l *configLocator) TempDir(storageName string) (string, error) { - return l.getPath(storageName, tmpRootPrefix) +func (l *configLocator) TempDir(ctx context.Context, storageName string) (string, error) { + return l.getPath(ctx, storageName, tmpRootPrefix) } // PartitionsDir returns the path to the partitions dir for a storage. func (l *configLocator) PartitionsDir(storageName string) (string, error) { - return l.getPath(storageName, partitionsPrefix) + return l.getPath(context.Background(), storageName, partitionsPrefix) } -func (l *configLocator) getPath(storageName, prefix string) (string, error) { - storagePath, ok := l.conf.StoragePath(storageName) - if !ok { - return "", structerr.NewInvalidArgument("%s dir: no such storage: %q", - filepath.Base(prefix), storageName) +// getPath retrieves the storage path and returns a path by joining it with the given prefix. +// If there is a transaction in the context, the transaction’s filesystem root will be used as +// the storage path, e.g. `private/tmp/gitaly-xx/yy/storages.d/default/staging/snapshots/1`. +// If there is no transaction in the context, the root storage path (as defined in the Gitaly +// TOML config) will be used, e.g. `private/tmp/gitaly-xx/yy/storages.d/default`. +// If the root storage path is required explicitly, an empty context (e.g. context.Background()) +// can be passed on purpose. +func (l *configLocator) getPath(ctx context.Context, storageName, prefix string) (string, error) { + var storagePath string + if txn := storage.ExtractTransaction(ctx); txn != nil { + storagePath = txn.FS().Root() + } else { + var ok bool + storagePath, ok = l.conf.StoragePath(storageName) + if !ok { + return "", structerr.NewInvalidArgument("%s dir: no such storage: %q", + filepath.Base(prefix), storageName) + } } - return filepath.Join(storagePath, prefix), nil } diff --git a/internal/gitaly/config/locator_test.go b/internal/gitaly/config/locator_test.go index 07d95f9e12d..215b9013768 100644 --- a/internal/gitaly/config/locator_test.go +++ b/internal/gitaly/config/locator_test.go @@ -238,24 +238,25 @@ func TestConfigLocator_StateDir(t *testing.T) { func TestConfigLocator_TempDir(t *testing.T) { t.Parallel() const storageName = "exists" + ctx := testhelper.Context(t) cfg := testcfg.Build(t, testcfg.WithStorages(storageName, "removed")) locator := config.NewLocator(cfg) t.Run("storage exists", func(t *testing.T) { - path, err := locator.TempDir(storageName) + path, err := locator.TempDir(ctx, storageName) require.NoError(t, err) require.Equal(t, path, filepath.Join(cfg.Storages[0].Path, "+gitaly/tmp")) }) t.Run("storage doesn't exist on disk", func(t *testing.T) { require.NoError(t, os.RemoveAll(cfg.Storages[1].Path)) - path, err := locator.TempDir(cfg.Storages[1].Name) + path, err := locator.TempDir(ctx, cfg.Storages[1].Name) require.NoError(t, err) require.Equal(t, filepath.Join(cfg.Storages[1].Path, "+gitaly/tmp"), path) }) t.Run("unknown storage", func(t *testing.T) { - _, err := locator.TempDir("unknown") + _, err := locator.TempDir(ctx, "unknown") require.Equal(t, structerr.NewInvalidArgument(`tmp dir: no such storage: "unknown"`), err) }) } diff --git a/internal/gitaly/linguist/language_stats.go b/internal/gitaly/linguist/language_stats.go index 48e92c64a21..0d192b9446f 100644 --- a/internal/gitaly/linguist/language_stats.go +++ b/internal/gitaly/linguist/language_stats.go @@ -138,14 +138,17 @@ func (c *languageStats) save(ctx context.Context, repo *localrepo.Repo, commitID if err != nil { return fmt.Errorf("getting relative path: %w", err) } - tempDir = tx.FS().Root() + tempDir, err = repo.StorageTempDir(ctx) + if err != nil { + return fmt.Errorf("locating temp dir: %w", err) + } finalPath = filepath.Join(tx.FS().Root(), relPath, languageStatsFilename) recordFunc = func() error { return tx.FS().RecordFile(filepath.Join(relPath, languageStatsFilename)) } } else { // Non-transaction path - tempDir, err = repo.StorageTempDir() + tempDir, err = repo.StorageTempDir(context.Background()) if err != nil { return fmt.Errorf("locating temp dir: %w", err) } diff --git a/internal/gitaly/repoutil/remove.go b/internal/gitaly/repoutil/remove.go index d74429e7224..99b09ac4537 100644 --- a/internal/gitaly/repoutil/remove.go +++ b/internal/gitaly/repoutil/remove.go @@ -66,7 +66,7 @@ func remove( } } - tempDir, err := locator.TempDir(repository.GetStorageName()) + tempDir, err := locator.TempDir(ctx, repository.GetStorageName()) if err != nil { return structerr.NewInternal("temporary directory: %w", err) } diff --git a/internal/gitaly/storage/locator.go b/internal/gitaly/storage/locator.go index 9be1ae0f6a6..ae7c4afdda5 100644 --- a/internal/gitaly/storage/locator.go +++ b/internal/gitaly/storage/locator.go @@ -92,7 +92,7 @@ type Locator interface { // CacheDir returns the path to the cache dir for a storage. CacheDir(storageName string) (string, error) // TempDir returns the path to the temp dir for a storage. - TempDir(storageName string) (string, error) + TempDir(ctx context.Context, storageName string) (string, error) // StateDir returns the path to the state dir for a storage. StateDir(storageName string) (string, error) // PartitionsDir returns the path to the partitions dir for a storage. diff --git a/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go b/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go index 73c6dd7da9b..b0f722dec5e 100644 --- a/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/snapshot/manager_test.go @@ -148,7 +148,7 @@ func TestManager(t *testing.T) { require.Equal(t, fs1.Root(), fs2.Root()) // Writing into shared snapshots is not allowed. - require.ErrorIs(t, os.WriteFile(filepath.Join(fs1.Root(), "some file"), nil, fs.ModePerm), os.ErrPermission) + require.ErrorIs(t, os.WriteFile(filepath.Join(fs1.Root(), "repositories", "some file"), nil, fs.ModePerm), os.ErrPermission) expectedDirectoryState := testhelper.DirectoryState{ "/": {Mode: ModeReadOnlyDirectory}, diff --git a/internal/tempdir/clean.go b/internal/tempdir/clean.go index cdd957fdf35..7d40b30d57b 100644 --- a/internal/tempdir/clean.go +++ b/internal/tempdir/clean.go @@ -63,7 +63,7 @@ func clean(logger log.Logger, locator storage.Locator, storage config.Storage) e ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dir, err := locator.TempDir(storage.Name) + dir, err := locator.TempDir(ctx, storage.Name) if err != nil { return fmt.Errorf("temporary dir: %w", err) } diff --git a/internal/tempdir/clean_test.go b/internal/tempdir/clean_test.go index a608ad8fe00..e20bfc07e0d 100644 --- a/internal/tempdir/clean_test.go +++ b/internal/tempdir/clean_test.go @@ -1,6 +1,7 @@ package tempdir import ( + "context" "os" "path/filepath" "testing" @@ -16,10 +17,11 @@ import ( ) func TestCleanSuccess(t *testing.T) { + ctx := testhelper.Context(t) cfg := testcfg.Build(t) locator := config.NewLocator(cfg) - cleanRoot, err := locator.TempDir(cfg.Storages[0].Name) + cleanRoot, err := locator.TempDir(ctx, cfg.Storages[0].Name) require.NoError(t, err) require.NoError(t, os.MkdirAll(cleanRoot, mode.Directory), "create clean root before setup") @@ -90,7 +92,7 @@ type mockLocator struct { storage.Locator } -func (m mockLocator) TempDir(storageName string) (string, error) { +func (m mockLocator) TempDir(ctx context.Context, storageName string) (string, error) { return "something", nil } @@ -129,19 +131,19 @@ func TestDedupStorages(t *testing.T) { } func chmod(t *testing.T, locator storage.Locator, storage config.Storage, p string, mode os.FileMode) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(context.Background(), storage.Name) require.NoError(t, err) require.NoError(t, os.Chmod(filepath.Join(root, p), mode)) } func chtimes(t *testing.T, locator storage.Locator, storage config.Storage, p string, date time.Time) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(context.Background(), storage.Name) require.NoError(t, err) require.NoError(t, os.Chtimes(filepath.Join(root, p), date, date)) } func assertEntries(t *testing.T, locator storage.Locator, storage config.Storage, entries ...string) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(context.Background(), storage.Name) require.NoError(t, err) foundEntries, err := os.ReadDir(root) @@ -155,7 +157,7 @@ func assertEntries(t *testing.T, locator storage.Locator, storage config.Storage } func makeFile(t *testing.T, locator storage.Locator, storage config.Storage, filePath string, mtime time.Time) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(context.Background(), storage.Name) require.NoError(t, err) fullPath := filepath.Join(root, filePath) @@ -164,7 +166,7 @@ func makeFile(t *testing.T, locator storage.Locator, storage config.Storage, fil } func makeDir(t *testing.T, locator storage.Locator, storage config.Storage, dirPath string, mtime time.Time) { - root, err := locator.TempDir(storage.Name) + root, err := locator.TempDir(context.Background(), storage.Name) require.NoError(t, err) fullPath := filepath.Join(root, dirPath) diff --git a/internal/tempdir/tempdir.go b/internal/tempdir/tempdir.go index 923cdf01212..de7a47ccada 100644 --- a/internal/tempdir/tempdir.go +++ b/internal/tempdir/tempdir.go @@ -80,7 +80,7 @@ func NewRepository(ctx context.Context, storageName string, logger log.Logger, l } func newDirectory(ctx context.Context, storageName string, prefix string, logger log.Logger, loc storage.Locator) (Dir, error) { - root, err := loc.TempDir(storageName) + root, err := loc.TempDir(ctx, storageName) if err != nil { return Dir{}, fmt.Errorf("temp directory: %w", err) } -- GitLab From b32d471a78c0e6c5186478732e257f94f6ac7054 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Wed, 20 Aug 2025 11:12:05 -0300 Subject: [PATCH 4/6] storage: Add method to explicitly ask for storage root MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Once the relative paths of original and snapshot repositories are aligned, the storage path of a repository becomes dynamic. Depending on whether the repository is a snapshot, its storage can be either: - The root storage (e.g. `gitaly-xxx/yyy/storages.d/default`) - A snapshot file system root (e.g. `gitaly-xxx/yyy/storages.d/default/+gitaly/staging/zz/snapshots/1`) When working with a snapshot, we sometimes need to explicitly query the original repository’s storage root. This commit adds a method and an option to support this. When using the method or passing the option, we will always return the root storage. --- internal/gitaly/config/locator.go | 25 ++++++++++++++++++++++++ internal/gitaly/storage/locator.go | 31 ++++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/internal/gitaly/config/locator.go b/internal/gitaly/config/locator.go index 2965e6d58b3..4fab7fae100 100644 --- a/internal/gitaly/config/locator.go +++ b/internal/gitaly/config/locator.go @@ -67,6 +67,9 @@ func (l *configLocator) ValidateRepository(ctx context.Context, repo storage.Rep } storagePath, err := l.GetStorageByName(ctx, repo.GetStorageName()) + if cfg.UseRootStorage { + storagePath, err = l.GetRootStoragePathByName(repo.GetStorageName()) + } if err != nil { return err } @@ -134,12 +137,19 @@ func (l *configLocator) GetRepoPath(ctx context.Context, repo storage.Repository storage.WithSkipRepositoryExistenceCheck(), } } + if cfg.UseRootStorage { + validationOptions = append(validationOptions, storage.WithValidateUsingRootStorage()) + } if err := l.ValidateRepository(ctx, repo, validationOptions...); err != nil { return "", err } storagePath, err := l.GetStorageByName(ctx, repo.GetStorageName()) + if cfg.UseRootStorage { + storagePath, err = l.GetRootStoragePathByName(repo.GetStorageName()) + } + if err != nil { return "", err } @@ -163,6 +173,21 @@ func (l *configLocator) GetStorageByName(ctx context.Context, storageName string return storagePath, nil } +// GetRootStoragePathByName will return the path for the storage, which is fetched by +// its key. An error is return if it cannot be found. +func (l *configLocator) GetRootStoragePathByName(storageName string) (string, error) { + if storageName == "" { + return "", structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet) + } + + storagePath, ok := l.conf.StoragePath(storageName) + if !ok { + return "", storage.NewStorageNotFoundError(storageName) + } + + return storagePath, nil +} + // CacheDir returns the path to the cache dir for a storage. func (l *configLocator) CacheDir(storageName string) (string, error) { return l.getPath(context.Background(), storageName, cachePrefix) diff --git a/internal/gitaly/storage/locator.go b/internal/gitaly/storage/locator.go index ae7c4afdda5..3eb16b2c6c5 100644 --- a/internal/gitaly/storage/locator.go +++ b/internal/gitaly/storage/locator.go @@ -85,9 +85,13 @@ type Locator interface { // will be skipped. The errors returned are gRPC errors with relevant error codes and should be // passed back to gRPC without further decoration. GetRepoPath(ctx context.Context, repo Repository, opts ...GetRepoPathOption) (string, error) - // GetStorageByName will return the path for the storage, which is fetched by - // its key. An error is return if it cannot be found. + // GetStorageByName returns the storage path for the given name. + // If running inside a transaction, it returns the snapshot filesystem root. + // Otherwise, it looks up the storage by key in the Gitaly config. + // An error is returned if the key cannot be found. GetStorageByName(ctx context.Context, storageName string) (string, error) + // GetRootStoragePathByName always return the storage path configured in gitaly toml + GetRootStoragePathByName(storageName string) (string, error) // CacheDir returns the path to the cache dir for a storage. CacheDir(storageName string) (string, error) @@ -112,6 +116,9 @@ type ValidateRepositoryConfig struct { // verify that whether the repository _would_ be valid if it existed, but not verify actual // existence. SkipRepositoryExistenceCheck bool + // UseStorageRoot forces using the root storage path defined in the config, + // instead of the transaction’s snapshot root, when a transaction is present. + UseRootStorage bool } // ValidateRepositoryOption is an option that can be passed to ValidateRepository. @@ -133,11 +140,23 @@ func WithSkipStorageExistenceCheck() ValidateRepositoryOption { } } +// WithValidateUsingRootStorage causes ValidateRepository to always use the storage path +// as defined in the Gitaly config, even when running inside a transaction. +func WithValidateUsingRootStorage() ValidateRepositoryOption { + return func(cfg *ValidateRepositoryConfig) { + cfg.UseRootStorage = true + } +} + // GetRepoPathConfig is used to configure GetRepoPath. type GetRepoPathConfig struct { // SkipRepositoryVerification will cause GetRepoPath to skip verification the verification whether the // computed path is an actual Git repository or not. SkipRepositoryVerification bool + + // UseRootStorage forces GetRepoPath to use the root storage path from the config + // instead of the transaction’s snapshot root when a transaction is active. + UseRootStorage bool } // GetRepoPathOption can be passed to GetRepoPath to change its default behavior. @@ -151,6 +170,14 @@ func WithRepositoryVerificationSkipped() GetRepoPathOption { } } +// WithRootStorage causes GetRepoPath to always return the repository path from the +// root storage, i.e., the storage path defined in the config. +func WithRootStorage() GetRepoPathOption { + return func(cfg *GetRepoPathConfig) { + cfg.UseRootStorage = true + } +} + // ValidateRelativePath validates a relative path by joining it with rootDir and verifying the result // is either rootDir or a path within rootDir. Returns clean relative path from rootDir to relativePath // or an ErrRelativePathEscapesRoot if the resulting path is not contained within rootDir. -- GitLab From bb7eafdb5e365a1bedf8ba3d147891a4abf1afcb Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Tue, 19 Aug 2025 10:57:55 -0300 Subject: [PATCH 5/6] transaction: Hide snapshot prefix from transactions Currently, TransactionManager isolates transactions by creating a snapshot of the accessed repository. It places the snapshot in the staging directory and rewrites the relative path to include a snapshot prefix, e.g. `staging/x/y/@hashed/...`. Some operations use the relative path as a key. This causes issues because the path becomes transaction-specific and no longer works as a stable key. We previously worked around this by using the OriginalRepository helper, but this is a leaky abstraction. Transactions should not see `staging/x/y/...` prefixes. This commit adds logic in the locator to extract the transaction from context, retrieve the snapshot root directory, and return that as the storage path. This hides the snapshot prefix and keeps the relative paths of original and snapshot repositories aligned. --- .../git/gitcmd/command_factory_cgroup_test.go | 12 +++- internal/gitaly/config/locator.go | 7 +-- internal/gitaly/hook/postreceive.go | 3 + internal/gitaly/hook/referencetransaction.go | 3 + .../service/hook/reference_transaction.go | 2 +- .../storage/storagemgr/middleware_test.go | 14 +++-- .../migration/leftover_file_migration.go | 2 +- .../storagemgr/partition/migration/manager.go | 2 + .../migration/xxxx_ref_backend_migration.go | 2 + .../storagemgr/partition/testhelper_test.go | 43 +++++++------ .../partition/transaction_manager.go | 63 ++++++++++++------- .../transaction_manager_housekeeping.go | 31 ++++----- 12 files changed, 113 insertions(+), 71 deletions(-) diff --git a/internal/git/gitcmd/command_factory_cgroup_test.go b/internal/git/gitcmd/command_factory_cgroup_test.go index ce50897e629..fedca49eeff 100644 --- a/internal/git/gitcmd/command_factory_cgroup_test.go +++ b/internal/git/gitcmd/command_factory_cgroup_test.go @@ -9,7 +9,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/cgroups" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -35,6 +37,10 @@ func (m *mockCgroupsManager) CloneIntoCgroup(c *exec.Cmd, _ ...cgroups.AddComman return "", io.NopCloser(nil), nil } +func (m *mockTransaction) FS() storage.FS { + return m.fs +} + func TestNewCommandAddsToCgroup(t *testing.T) { t.Parallel() @@ -65,6 +71,7 @@ func TestNewCommandAddsToCgroup(t *testing.T) { type mockTransaction struct { storage.Transaction originalRepo *gitalypb.Repository + fs storage.FS } func (m *mockTransaction) OriginalRepository(storage.Repository) *gitalypb.Repository { @@ -110,9 +117,12 @@ func TestNewCommandCgroupStable(t *testing.T) { defer cleanup() originalRepo := &gitalypb.Repository{StorageName: "default", RelativePath: "some/relative/path"} - + locator := config.NewLocator(cfg) + storagePath, err := locator.GetStorageByName(ctx, "default") + require.NoError(t, err) ctx = storage.ContextWithTransaction(ctx, &mockTransaction{ originalRepo: originalRepo, + fs: fsrecorder.NewFS(storagePath, nil), }) cmd, err := gitCmdFactory.New(ctx, repo, gitcmd.Command{ diff --git a/internal/gitaly/config/locator.go b/internal/gitaly/config/locator.go index 4fab7fae100..91b17f94f09 100644 --- a/internal/gitaly/config/locator.go +++ b/internal/gitaly/config/locator.go @@ -165,12 +165,11 @@ func (l *configLocator) GetStorageByName(ctx context.Context, storageName string return "", structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet) } - storagePath, ok := l.conf.StoragePath(storageName) - if !ok { - return "", storage.NewStorageNotFoundError(storageName) + if tx := storage.ExtractTransaction(ctx); tx != nil { + return tx.FS().Root(), nil } - return storagePath, nil + return l.GetRootStoragePathByName(storageName) } // GetRootStoragePathByName will return the path for the storage, which is fetched by diff --git a/internal/gitaly/hook/postreceive.go b/internal/gitaly/hook/postreceive.go index 888d3dfaeea..85eea3935b8 100644 --- a/internal/gitaly/hook/postreceive.go +++ b/internal/gitaly/hook/postreceive.go @@ -154,6 +154,9 @@ func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb. ReadOnly: true, RelativePath: originalRepo.GetRelativePath(), }) + // A new transaction is created and it should be put in the context to replace (or hide) the old closed + // one, so that `postReceiveHook` in the following logic can work on the correct transaction. + ctx = storage.ContextWithTransaction(ctx, tx) if err != nil { return fmt.Errorf("begin transaction: %w", err) } diff --git a/internal/gitaly/hook/referencetransaction.go b/internal/gitaly/hook/referencetransaction.go index 58d3b8c4e84..5297ec079b3 100644 --- a/internal/gitaly/hook/referencetransaction.go +++ b/internal/gitaly/hook/referencetransaction.go @@ -48,6 +48,9 @@ func (m *GitLabHookManager) ReferenceTransactionHook(ctx context.Context, state if err != nil { return fmt.Errorf("get transaction: %w", err) } + if tx != nil { + ctx = storage.ContextWithTransaction(ctx, tx) + } } var phase voting.Phase diff --git a/internal/gitaly/service/hook/reference_transaction.go b/internal/gitaly/service/hook/reference_transaction.go index 410e3b0a897..ce9ea4961ed 100644 --- a/internal/gitaly/service/hook/reference_transaction.go +++ b/internal/gitaly/service/hook/reference_transaction.go @@ -13,7 +13,7 @@ import ( ) func validateReferenceTransactionHookRequest(ctx context.Context, locator storage.Locator, in *gitalypb.ReferenceTransactionHookRequest) error { - return locator.ValidateRepository(ctx, in.GetRepository()) + return locator.ValidateRepository(ctx, in.GetRepository(), storage.WithSkipRepositoryExistenceCheck()) } func (s *server) ReferenceTransactionHook(stream gitalypb.HookService_ReferenceTransactionHookServer) error { diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go index 0cd3ca080f7..75359d583fa 100644 --- a/internal/gitaly/storage/storagemgr/middleware_test.go +++ b/internal/gitaly/storage/storagemgr/middleware_test.go @@ -351,8 +351,7 @@ messages and behavior by erroring out the requests before they even hit this int }, assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) { expected := validAdditionalRepository() - // The additional repository's relative path should have been rewritten. - require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + require.Equal(t, expected.GetRelativePath(), actual.GetRelativePath()) // But the restored non-snapshotted repository should match the original. testhelper.ProtoEqual(t, expected, storage.ExtractTransaction(ctx).OriginalRepository(actual)) }, @@ -384,8 +383,7 @@ messages and behavior by erroring out the requests before they even hit this int }, assertAdditionalRepository: func(t *testing.T, ctx context.Context, actual *gitalypb.Repository) { expected := validAdditionalRepository() - // The additional repository's relative path should have been rewritten. - require.NotEqual(t, expected.GetRelativePath(), actual.GetRelativePath()) + require.Equal(t, expected.GetRelativePath(), actual.GetRelativePath()) // But the restored non-snapshotted repository should match the original. testhelper.ProtoEqual(t, expected, storage.ExtractTransaction(ctx).OriginalRepository(actual)) }, @@ -535,8 +533,12 @@ messages and behavior by erroring out the requests before they even hit this int expectedRepo := validRepository() actualRepo := repo - // When run in a transaction, the relative path will be pointed to the snapshot. - assert.NotEqual(t, expectedRepo.GetRelativePath(), repo.GetRelativePath()) + // When run in a transaction, the relative path will be the same, but the + // transaction should be included in the ctx + tx := storage.ExtractTransaction(ctx) + assert.NotNil(t, tx, "transaction not in the context") + assert.Equal(t, expectedRepo.GetRelativePath(), repo.GetRelativePath(), + "relative path should be the same even when in transaction") expectedRepo.RelativePath = "" actualRepo.RelativePath = "" diff --git a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go index b4aa3d6f7f6..84e3ba22635 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/leftover_file_migration.go @@ -31,7 +31,7 @@ func NewLeftoverFileMigration(locator storage.Locator) Migration { Fn: func(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { // Use snapshotFilter to match entry paths that must be kept in the repo. snapshotFilter := snapshot.NewRegexSnapshotFilter() - storagePath, err := locator.GetStorageByName(ctx, storageName) + storagePath, err := locator.GetRootStoragePathByName(storageName) if err != nil { return fmt.Errorf("resolve storage path: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager.go b/internal/gitaly/storage/storagemgr/partition/migration/manager.go index c5d1c0692ac..d46698e5443 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager.go @@ -160,6 +160,8 @@ func (m *migrationManager) performMigrations(ctx context.Context, opts storage.B if err != nil { return fmt.Errorf("begin migration update: %w", err) } + ctx = storage.ContextWithTransaction(ctx, txn) + defer func() { if returnedErr != nil { if err := txn.Rollback(ctx); err != nil { diff --git a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go index f6c46409c22..9f31838b16b 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_ref_backend_migration.go @@ -29,6 +29,8 @@ func NewReferenceBackendMigration( ID: id, Name: name, Fn: func(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { + ctx = storage.ContextWithTransaction(ctx, tx) + scopedFactory, err := localRepoFactory.ScopeByStorage(ctx, storageName) if err != nil { return fmt.Errorf("creating storage scoped factory: %w", err) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 39e10b375c2..902d0a639f9 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -1368,6 +1368,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.Contains(t, openTransactions, step.TransactionID, "test error: transaction committed before beginning it") transaction := openTransactions[step.TransactionID] + ctxWithTxn := storage.ContextWithTransaction(ctx, transaction) if transaction.relativePath != "" { rewrittenRepo := setup.RepositoryFactory.Build( transaction.RewriteRepository(&gitalypb.Repository{ @@ -1378,7 +1379,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas if step.UpdateAlternate != nil { require.NoError(t, objectpool.Disconnect( - storage.ContextWithTransaction(ctx, transaction), + ctxWithTxn, transaction.FS(), rewrittenRepo, logger, @@ -1387,7 +1388,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas if step.UpdateAlternate.RelativePath != "" { require.NoError(t, objectpool.Link( - storage.ContextWithTransaction(ctx, transaction), + ctxWithTxn, setup.RepositoryFactory.Build(transaction.RewriteRepository(&gitalypb.Repository{ StorageName: setup.Config.Storages[0].Name, RelativePath: step.UpdateAlternate.RelativePath, @@ -1399,7 +1400,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } if step.UpdateGitConfig != nil { - updateGitConfig(t, ctx, rewrittenRepo, step.UpdateGitConfig, transaction) + updateGitConfig(t, ctxWithTxn, rewrittenRepo, step.UpdateGitConfig, transaction) } if step.QuarantinedPacks != nil { @@ -1415,24 +1416,24 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } for _, pack := range step.QuarantinedPacks { - require.NoError(t, rewrittenRepo.UnpackObjects(ctx, bytes.NewReader(pack))) + require.NoError(t, rewrittenRepo.UnpackObjects(ctxWithTxn, bytes.NewReader(pack))) } } if step.ReferenceUpdates != nil { - require.NoError(t, performReferenceUpdates(t, ctx, transaction, rewrittenRepo, step.ReferenceUpdates)) + require.NoError(t, performReferenceUpdates(t, ctxWithTxn, transaction, rewrittenRepo, step.ReferenceUpdates)) } if step.DefaultBranchUpdate != nil { - require.NoError(t, rewrittenRepo.SetDefaultBranch(storage.ContextWithTransaction(ctx, transaction), nil, step.DefaultBranchUpdate.Reference)) - require.NoError(t, transaction.UpdateReferences(ctx, map[git.ReferenceName]git.ReferenceUpdate{ + require.NoError(t, rewrittenRepo.SetDefaultBranch(ctxWithTxn, nil, step.DefaultBranchUpdate.Reference)) + require.NoError(t, transaction.UpdateReferences(ctxWithTxn, map[git.ReferenceName]git.ReferenceUpdate{ "HEAD": {NewTarget: step.DefaultBranchUpdate.Reference}, })) } if step.CustomHooksUpdate != nil { require.NoError(t, repoutil.SetCustomHooks( - storage.ContextWithTransaction(ctx, transaction), + ctxWithTxn, logger, config.NewLocator(setup.Config), nil, @@ -1443,7 +1444,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas if step.DeleteRepository { require.NoError(t, repoutil.Remove( - storage.ContextWithTransaction(ctx, transaction), + ctxWithTxn, logger, config.NewLocator(setup.Config), nil, @@ -1475,7 +1476,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.Contains(t, openTransactions, step.TransactionID, "test error: record initial reference value on transaction before beginning it") transaction := openTransactions[step.TransactionID] - require.NoError(t, transaction.RecordInitialReferenceValues(ctx, step.InitialValues)) + require.NoError(t, transaction.RecordInitialReferenceValues(storage.ContextWithTransaction(ctx, transaction), step.InitialValues)) case UpdateReferences: require.Contains(t, openTransactions, step.TransactionID, "test error: reference updates aborted on committed before beginning it") @@ -1488,7 +1489,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas }), ) - require.Equal(t, step.ExpectedError, performReferenceUpdates(t, ctx, transaction, rewrittenRepo, step.ReferenceUpdates)) + require.Equal(t, step.ExpectedError, performReferenceUpdates(t, storage.ContextWithTransaction(ctx, transaction), transaction, rewrittenRepo, step.ReferenceUpdates)) case ReadKey: require.Contains(t, openTransactions, step.TransactionID, "test error: read key called on transaction before beginning it") @@ -1554,6 +1555,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.Contains(t, openTransactions, step.TransactionID, "test error: repository created in transaction before beginning it") transaction := openTransactions[step.TransactionID] + ctxWithTxn := storage.ContextWithTransaction(ctx, transaction) rewrittenRepository := transaction.RewriteRepository(&gitalypb.Repository{ StorageName: setup.Config.Storages[0].Name, @@ -1563,7 +1565,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas locator := config.NewLocator(setup.Config) require.NoError(t, repoutil.Create( - storage.ContextWithTransaction(ctx, transaction), + ctxWithTxn, logger, locator, setup.CommandFactory, @@ -1575,25 +1577,26 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas repo := setup.RepositoryFactory.Build(repoProto) if step.DefaultBranch != "" { - require.NoError(t, repo.SetDefaultBranch(ctx, nil, step.DefaultBranch)) + require.NoError(t, repo.SetDefaultBranch(ctxWithTxn, nil, step.DefaultBranch)) } for _, pack := range step.Packs { - require.NoError(t, repo.UnpackObjects(ctx, bytes.NewReader(pack))) + require.NoError(t, repo.UnpackObjects(ctxWithTxn, bytes.NewReader(pack))) } for name, oid := range step.References { - require.NoError(t, repo.UpdateRef(ctx, name, oid, setup.ObjectHash.ZeroOID)) + require.NoError(t, repo.UpdateRef(ctxWithTxn, name, oid, setup.ObjectHash.ZeroOID)) } if step.CustomHooks != nil { - require.NoError(t, - repoutil.SetCustomHooks(ctx, logger, config.NewLocator(setup.Config), nil, bytes.NewReader(step.CustomHooks), repo), - ) + repoPath, err := repo.Path(ctxWithTxn) + require.NoError(t, err) + require.NoError(t, repoutil.ExtractHooks(ctxWithTxn, logger, + bytes.NewReader(step.CustomHooks), repoPath, false)) } if step.Alternate != "" { - repoPath, err := repo.Path(ctx) + repoPath, err := repo.Path(ctxWithTxn) require.NoError(t, err) alternatesPath := stats.AlternatesFilePath(repoPath) @@ -1636,7 +1639,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] - RequireRepositories(t, ctx, setup.Config, + RequireRepositories(t, storage.ContextWithTransaction(ctx, transaction), setup.Config, // Assert the contents of the transaction's snapshot. filepath.Join(setup.Config.Storages[0].Path, transaction.snapshot.Prefix()), // Rewrite all of the repositories to point to their snapshots. diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 4b6717850f4..9e57041873f 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -413,20 +413,21 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti txn.fs = fsrecorder.NewFS(txn.snapshot.Root(), txn.walEntry) if txn.repositoryTarget() { - txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.snapshot.RelativePath(txn.relativePath)) + txn.repositoryExists, err = mgr.doesRepositoryExist(ctx, txn.relativePath) if err != nil { return nil, fmt.Errorf("does repository exist: %w", err) } - txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshot.RelativePath(txn.relativePath)) + txn.snapshotRepository = mgr.repositoryFactory.Build(txn.relativePath) if txn.write { if txn.repositoryExists { + ctxWithTxn := storage.ContextWithTransaction(ctx, txn) txn.quarantineDirectory = filepath.Join(txn.stagingDirectory, "quarantine") if err := os.MkdirAll(filepath.Join(txn.quarantineDirectory, "pack"), mode.Directory); err != nil { return nil, fmt.Errorf("create quarantine directory: %w", err) } - txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(ctx, txn.quarantineDirectory) + txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(ctxWithTxn, txn.quarantineDirectory) if err != nil { return nil, fmt.Errorf("quarantine: %w", err) } @@ -436,13 +437,13 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti return nil, fmt.Errorf("create reference recorder tmp dir: %w", err) } - refBackend, err := txn.snapshotRepository.ReferenceBackend(ctx) + refBackend, err := txn.snapshotRepository.ReferenceBackend(ctxWithTxn) if err != nil { return nil, fmt.Errorf("reference backend: %w", err) } if refBackend == git.ReferenceBackendFiles { - objectHash, err := txn.snapshotRepository.ObjectHash(ctx) + objectHash, err := txn.snapshotRepository.ObjectHash(ctxWithTxn) if err != nil { return nil, fmt.Errorf("object hash: %w", err) } @@ -453,7 +454,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti } if refBackend == git.ReferenceBackendReftables { - snapshotRepositoryPath, err := txn.snapshotRepository.Path(ctx) + snapshotRepositoryPath, err := txn.snapshotRepository.Path(ctxWithTxn) if err != nil { return nil, fmt.Errorf("snapshot repository path: %w", err) } @@ -516,13 +517,10 @@ func (txn *Transaction) PartitionRelativePaths() []string { // the repository in the transaction's snapshot. func (txn *Transaction) RewriteRepository(repo *gitalypb.Repository) *gitalypb.Repository { rewritten := proto.Clone(repo).(*gitalypb.Repository) - rewritten.RelativePath = txn.snapshot.RelativePath(repo.GetRelativePath()) - if repo.GetRelativePath() == txn.relativePath { rewritten.GitObjectDirectory = txn.snapshotRepository.GetGitObjectDirectory() rewritten.GitAlternateObjectDirectories = txn.snapshotRepository.GetGitAlternateObjectDirectories() } - return rewritten } @@ -1078,6 +1076,7 @@ type resultChannel chan commitResult // commit queues the transaction for processing and returns once the result has been determined. func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) (storage.LSN, error) { span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.Commit", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() transaction.result = make(resultChannel, 1) @@ -1264,23 +1263,28 @@ func (mgr *TransactionManager) stageRepositoryCreation(ctx context.Context, tran // setupStagingRepository sets up a snapshot that is used for verifying and staging changes. It contains up to // date state of the partition. It does not have the quarantine configured. -func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, transaction *Transaction) (*localrepo.Repo, error) { +func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, transaction *Transaction) (context.Context, *localrepo.Repo, error) { defer trace.StartRegion(ctx, "setupStagingRepository").End() span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.setupStagingRepository", nil) defer span.Finish() if transaction.stagingSnapshot != nil { - return nil, errors.New("staging snapshot already setup") + return nil, nil, errors.New("staging snapshot already setup") } var err error transaction.stagingSnapshot, err = mgr.snapshotManager.GetSnapshot(ctx, []string{transaction.relativePath}, true) if err != nil { - return nil, fmt.Errorf("new snapshot: %w", err) + return nil, nil, fmt.Errorf("new snapshot: %w", err) } - return mgr.repositoryFactory.Build(transaction.stagingSnapshot.RelativePath(transaction.relativePath)), nil + // a wrapped ctx that point to staging root + stagingTxn := &Transaction{ + fs: fsrecorder.NewFS(transaction.stagingSnapshot.Root(), nil), + } + stagingCtx := storage.ContextWithTransaction(ctx, stagingTxn) + return stagingCtx, mgr.repositoryFactory.Build(transaction.relativePath), nil } // packPrefixRegexp matches the output of `git index-pack` where it @@ -1307,7 +1311,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra return nil } - if _, err := os.Stat(mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath())); err != nil { + if _, err := os.Stat(mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath())); err != nil { if !errors.Is(err, fs.ErrNotExist) { return fmt.Errorf("stat: %w", err) } @@ -1318,6 +1322,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra } span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.packObjects", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() // We want to only pack the objects that are present in the quarantine as they are potentially @@ -1494,7 +1499,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra // why we don't track 'tables.list' operation here. func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, transaction *Transaction) error { runPackRefs := transaction.runHousekeeping.packRefs - repoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) + repoPath := mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath()) if err := allowReftableCompaction(repoPath); err != nil { return fmt.Errorf("allow reftable compaction: %w", err) @@ -1590,7 +1595,7 @@ func (mgr *TransactionManager) preparePackRefsFiles(ctx context.Context, transac // First walk to collect the list of loose refs. looseReferences := make(map[git.ReferenceName]struct{}) - repoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) + repoPath := mgr.getAbsolutePath(ctx, transaction.snapshotRepository.GetRelativePath()) if err := filepath.WalkDir(filepath.Join(repoPath, "refs"), func(path string, entry fs.DirEntry, err error) error { if err != nil { return err @@ -1764,7 +1769,8 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned transaction.result <- func() commitResult { var zeroOID git.ObjectID if transaction.repositoryTarget() { - repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath) + + repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath, storage.WithRootStorage()) if err != nil { return commitResult{error: fmt.Errorf("does repository exist: %w", err)} } @@ -1801,7 +1807,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned if refBackend == git.ReferenceBackendReftables || transaction.runHousekeeping != nil { if refBackend == git.ReferenceBackendReftables { if err := transaction.reftableRecorder.stageTables(ctx, - mgr.getAbsolutePath(transaction.relativePath), + mgr.getAbsolutePath(ctx, transaction.relativePath), transaction, ); err != nil { return commitResult{error: fmt.Errorf("stage tables: %w", err)} @@ -2100,10 +2106,10 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { } // doesRepositoryExist returns whether the repository exists or not. -func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relativePath string) (bool, error) { +func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relativePath string, opts ...storage.GetRepoPathOption) (bool, error) { defer trace.StartRegion(ctx, "doesRepositoryExist").End() - stat, err := os.Stat(mgr.getAbsolutePath(relativePath)) + stat, err := os.Stat(mgr.getAbsolutePath(ctx, relativePath, opts...)) if err != nil { if errors.Is(err, fs.ErrNotExist) { return false, nil @@ -2119,9 +2125,20 @@ func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relative return true, nil } -// getAbsolutePath returns the relative path's absolute path in the storage. -func (mgr *TransactionManager) getAbsolutePath(relativePath ...string) string { - return filepath.Join(append([]string{mgr.storagePath}, relativePath...)...) +// getAbsolutePath returns the absolute path of a relative path within the storage. +// If the GetRepoPathOption UseRootStorage is set, it returns the repository’s original +// path in the root storage (i.e., the storage defined in the Gitaly config). +func (mgr *TransactionManager) getAbsolutePath(ctx context.Context, relativePath string, opts ...storage.GetRepoPathOption) string { + var cfg storage.GetRepoPathConfig + for _, opt := range opts { + opt(&cfg) + } + + if txn := storage.ExtractTransaction(ctx); txn != nil && !cfg.UseRootStorage { + return filepath.Join(txn.FS().Root(), relativePath) + } + + return filepath.Join(mgr.storagePath, relativePath) } // packFilePath returns a log entry's pack file's absolute path in the wal files directory. diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go index e607bb8d7d6..6dedeeb9732 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go @@ -122,6 +122,7 @@ func (mgr *TransactionManager) prepareHousekeeping(ctx context.Context, transact } span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.prepareHousekeeping", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() finishTimer := mgr.metrics.housekeeping.ReportTaskLatency("total", "prepare") @@ -214,8 +215,8 @@ func (mgr *TransactionManager) prepareRepacking(ctx context.Context, transaction // Build a working repository pointing to snapshot repository. Housekeeping task can access the repository // without the needs for quarantine. - workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)) - repoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) + repoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) isFullRepack, err := housekeeping.ValidateRepacking(repack.config) if err != nil { @@ -422,7 +423,7 @@ func (mgr *TransactionManager) prepareCommitGraphs(ctx context.Context, transact } if err := housekeeping.WriteCommitGraph(ctx, - mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)), + mgr.repositoryFactory.Build(transaction.relativePath), transaction.runHousekeeping.writeCommitGraphs.config, ); err != nil { return fmt.Errorf("re-writing commit graph: %w", err) @@ -471,6 +472,7 @@ func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transacti defer trace.StartRegion(ctx, "verifyHousekeeping").End() span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.verifyHousekeeping", nil) + ctx = storage.ContextWithTransaction(ctx, transaction) defer span.Finish() finishTimer := mgr.metrics.housekeeping.ReportTaskLatency("total", "verify") @@ -535,7 +537,7 @@ func (mgr *TransactionManager) verifyPackRefs(ctx context.Context, transaction * defer finishTimer() if refBackend == git.ReferenceBackendReftables { - packRefs, err := mgr.verifyPackRefsReftable(transaction) + packRefs, err := mgr.verifyPackRefsReftable(ctx, transaction) if err != nil { return nil, fmt.Errorf("reftable backend: %w", err) } @@ -589,7 +591,7 @@ func (mgr *TransactionManager) verifyRepacking(ctx context.Context, transaction // Setup a working repository of the destination repository and all changes of current transactions. All // concurrent changes must land in that repository already. - stagingRepository, err := mgr.setupStagingRepository(ctx, transaction) + stagingCtx, stagingRepository, err := mgr.setupStagingRepository(ctx, transaction) if err != nil { return fmt.Errorf("setting up new snapshot for verifying repacking: %w", err) } @@ -626,7 +628,7 @@ func (mgr *TransactionManager) verifyRepacking(ctx context.Context, transaction return fmt.Errorf("walking committed entries: %w", err) } - if err := mgr.verifyObjectsExist(ctx, stagingRepository, objectDependencies); err != nil { + if err := mgr.verifyObjectsExist(stagingCtx, stagingRepository, objectDependencies); err != nil { var errInvalidObject localrepo.InvalidObjectError if errors.As(err, &errInvalidObject) { return errRepackConflictPrunedObject @@ -643,7 +645,7 @@ func (mgr *TransactionManager) verifyRepacking(ctx context.Context, transaction // We merge the tables.list generated by our compaction with the existing // repositories tables.list. Because there could have been new tables after // we performed compaction. -func (mgr *TransactionManager) verifyPackRefsReftable(transaction *Transaction) (*gitalypb.LogEntry_Housekeeping_PackRefs, error) { +func (mgr *TransactionManager) verifyPackRefsReftable(ctx context.Context, transaction *Transaction) (*gitalypb.LogEntry_Housekeeping_PackRefs, error) { tables := transaction.runHousekeeping.packRefs.reftablesAfter if len(tables) < 1 { return nil, nil @@ -653,14 +655,13 @@ func (mgr *TransactionManager) verifyPackRefsReftable(transaction *Transaction) // repository before the compaction. However, concurrent writes might have occurred which // wrote new tables to the target repository. We shouldn't loose that data. So we merge // the compacted tables.list with the newer tables from the target repository's tables.list. - repoPath := mgr.getAbsolutePath(transaction.relativePath) + repoPath := mgr.getAbsolutePath(ctx, transaction.relativePath, storage.WithRootStorage()) newTableList, err := reftable.ReadTablesList(repoPath) if err != nil { return nil, fmt.Errorf("reading tables.list: %w", err) } - snapshotRepoPath := mgr.getAbsolutePath(transaction.snapshotRepository.GetRelativePath()) - + snapshotRepoPath := mgr.getAbsolutePath(storage.ContextWithTransaction(context.Background(), transaction), transaction.snapshotRepository.GetRelativePath()) // tables.list is hard-linked from the repository to the snapshot, we shouldn't // directly write to it as we'd modify the original. So let's remove the // hard-linked file. @@ -788,7 +789,7 @@ func (mgr *TransactionManager) verifyPackRefsFiles(ctx context.Context, transact if isDir { // If this is a directory, we need to ensure it is actually empty before removing // it. Check if we find any directory entries we haven't yet deleted. - entries, err := os.ReadDir(mgr.getAbsolutePath(relativePath)) + entries, err := os.ReadDir(mgr.getAbsolutePath(ctx, relativePath, storage.WithRootStorage())) if err != nil { return fmt.Errorf("read dir: %w", err) } @@ -843,10 +844,10 @@ func (mgr *TransactionManager) prepareOffloading(ctx context.Context, transactio // Loading configurations for offloading cfg := transaction.runHousekeeping.runOffloading.config - workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)) + workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) // workingRepoPath is the current repository path which we are performing operations on. // In the context of transaction, workingRepoPath is a snapshot repository. - workingRepoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + workingRepoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) // Find the original repository's absolute path. In the context of transaction, originalRepo is the repo // which we are taking a snapshot of. originalRepo := &gitalypb.Repository{ @@ -979,10 +980,10 @@ func (mgr *TransactionManager) prepareRehydrating(ctx context.Context, transacti span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.prepareRehydrating", nil) defer span.Finish() - workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.RelativePath(transaction.relativePath)) + workingRepository := mgr.repositoryFactory.Build(transaction.relativePath) // workingRepoPath is the current repository path which we are performing operations on. // In the context of transaction, workingRepoPath is a snapshot repository. - workingRepoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + workingRepoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) prefix := transaction.runHousekeeping.runRehydrating.prefix packFilesToDownload, err := mgr.offloadingSink.List(ctx, prefix) -- GitLab From e9545769651e97e6d02d18cd44f97ecd8ac20f96 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Mon, 29 Sep 2025 20:17:50 -0400 Subject: [PATCH 6/6] partition: Remove OriginalRepository helper Previously, TransactionManager isolated transactions by creating snapshots of accessed repositories. It placed these snapshots in the staging directory and modified the relative path to include a snapshot prefix, e.g. `staging/x/y/@hashed/...`. This caused problems for operations that rely on the relative path as a key, since the path became transaction-specific and could no longer serve as a stable identifier. To work around this, we used the OriginalRepository helper. But this was a leaky abstraction: transactions should never be exposed to `staging/x/y/...` prefixes. Now that original and snapshot repositories share the same relative path, the OriginalRepository helper is no longer needed. This commit removes it. --- internal/backup/repository.go | 2 +- internal/bundleuri/manager.go | 19 +++---------- internal/bundleuri/strategy_occurences.go | 10 ------- internal/git/gitcmd/command_factory.go | 3 --- .../git/gitcmd/command_factory_cgroup_test.go | 14 +++------- .../manager/optimize_repository.go | 27 ++++++------------- .../manager/optimize_repository_test.go | 8 ------ internal/gitaly/hook/postreceive.go | 10 +++---- internal/gitaly/hook/prereceive.go | 5 +++- internal/gitaly/service/hook/pack_objects.go | 10 +------ internal/gitaly/service/objectpool/create.go | 2 +- internal/gitaly/service/objectpool/delete.go | 2 +- .../objectpool/fetch_into_object_pool.go | 4 +-- internal/gitaly/service/objectpool/get.go | 6 ----- .../gitaly/service/repository/create_fork.go | 2 +- .../service/repository/create_repository.go | 2 +- .../create_repository_from_bundle.go | 2 +- .../create_repository_from_snapshot.go | 2 +- .../repository/create_repository_from_url.go | 2 +- .../repository/migrate_reference_backend.go | 2 +- internal/gitaly/service/repository/remove.go | 2 +- .../gitaly/service/repository/replicate.go | 4 +-- internal/gitaly/service/smarthttp/cache.go | 10 ------- internal/gitaly/storage/storage.go | 2 -- .../storage/storagemgr/middleware_test.go | 4 +-- .../migration/reftable/middleware.go | 4 --- .../partition/transaction_manager.go | 15 ----------- .../transaction_manager_housekeeping.go | 10 +------ internal/praefect/coordinator_test.go | 9 +++---- 29 files changed, 43 insertions(+), 151 deletions(-) diff --git a/internal/backup/repository.go b/internal/backup/repository.go index 3108d50e6e8..b8b32f4031c 100644 --- a/internal/backup/repository.go +++ b/internal/backup/repository.go @@ -764,7 +764,7 @@ func (r *localRepository) Create(ctx context.Context, hash git.ObjectHash, defau if err := r.migrationStateManager.RecordKeyCreation( tx, - tx.OriginalRepository(repo).GetRelativePath(), + repo.GetRelativePath(), ); err != nil { return fmt.Errorf("recording migration key: %w", err) } diff --git a/internal/bundleuri/manager.go b/internal/bundleuri/manager.go index c8d02e293d4..513a2f9cf0a 100644 --- a/internal/bundleuri/manager.go +++ b/internal/bundleuri/manager.go @@ -118,9 +118,7 @@ func (g *GenerationManager) Generate(ctx context.Context, repo *localrepo.Repo) g.logger.WithError(err).Error("generate bundle: nil node manager within transaction") return nil } - - originalRepo := tx.OriginalRepository(repoProto) - strg, err := g.nodeManager.GetStorage(originalRepo.GetStorageName()) + strg, err := g.nodeManager.GetStorage(repoProto.GetStorageName()) if err != nil { g.logger.WithError(err).Error("generate bundle: error getting storage") return nil @@ -128,7 +126,7 @@ func (g *GenerationManager) Generate(ctx context.Context, repo *localrepo.Repo) // Create the transaction on the new context created above ntx, err := strg.Begin(gCtx, storage.TransactionOptions{ ReadOnly: true, - RelativePath: originalRepo.GetRelativePath(), + RelativePath: repoProto.GetRelativePath(), }) if err != nil { g.logger.WithError(err).Error("generate bundle: no transaction found") @@ -139,7 +137,7 @@ func (g *GenerationManager) Generate(ctx context.Context, repo *localrepo.Repo) // bundle generation. So once the bundle is generated, we must abort // to free the snapshot. defer func() { _ = ntx.Rollback(gCtx) }() - bundlePath = bundleRelativePath(originalRepo, defaultBundle) + bundlePath = bundleRelativePath(repoProto, defaultBundle) } writer := backup.NewLazyWriter(func() (io.WriteCloser, error) { @@ -176,17 +174,6 @@ func (g *GenerationManager) Generate(ctx context.Context, repo *localrepo.Repo) // SignedURL returns a public URL to give anyone access to download the bundle from. func (g *GenerationManager) SignedURL(ctx context.Context, repo storage.Repository) (string, error) { relativePath := bundleRelativePath(repo, defaultBundle) - - repoProto, ok := repo.(*gitalypb.Repository) - if !ok { - return "", fmt.Errorf("unexpected repository type %t", repo) - } - - if tx := storage.ExtractTransaction(ctx); tx != nil { - origRepo := tx.OriginalRepository(repoProto) - relativePath = bundleRelativePath(origRepo, defaultBundle) - } - return g.sink.signedURL(ctx, relativePath) } diff --git a/internal/bundleuri/strategy_occurences.go b/internal/bundleuri/strategy_occurences.go index c35bb0abffe..647f13e1606 100644 --- a/internal/bundleuri/strategy_occurences.go +++ b/internal/bundleuri/strategy_occurences.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -30,15 +29,6 @@ func newEvaluateRequest(ctx context.Context, repo *localrepo.Repo, t time.Time, if !ok { return evaluateRequest{}, errors.New("expecting repo.Repository to be of type *gitalypb.Repository") } - - // A transaction re-writes the relative path to include the - // snapshot path. Using the snapshot path will not work here - // because we need a common key for each repository, not for - // each snapshot. - if tx := storage.ExtractTransaction(ctx); tx != nil { - repoProto = tx.OriginalRepository(repoProto) - } - return evaluateRequest{ ctx: ctx, repo: repo, diff --git a/internal/git/gitcmd/command_factory.go b/internal/git/gitcmd/command_factory.go index 78cb28aead6..680949f1e67 100644 --- a/internal/git/gitcmd/command_factory.go +++ b/internal/git/gitcmd/command_factory.go @@ -515,9 +515,6 @@ func (cf *ExecCommandFactory) newCommand(ctx context.Context, repo storage.Repos var cgroupsAddCommandOpts []cgroups.AddCommandOption if repo != nil { relativePath := repo.GetRelativePath() - if tx := storage.ExtractTransaction(ctx); tx != nil { - relativePath = tx.OriginalRepository(repo).GetRelativePath() - } cgroupsAddCommandOpts = []cgroups.AddCommandOption{ cgroups.WithCgroupKey(repo.GetStorageName() + "/" + relativePath), } diff --git a/internal/git/gitcmd/command_factory_cgroup_test.go b/internal/git/gitcmd/command_factory_cgroup_test.go index fedca49eeff..39dd3c0145b 100644 --- a/internal/git/gitcmd/command_factory_cgroup_test.go +++ b/internal/git/gitcmd/command_factory_cgroup_test.go @@ -15,7 +15,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type mockCgroupsManager struct { @@ -70,12 +69,7 @@ func TestNewCommandAddsToCgroup(t *testing.T) { // mockTransaction does nothing except allows setting the original repository type mockTransaction struct { storage.Transaction - originalRepo *gitalypb.Repository - fs storage.FS -} - -func (m *mockTransaction) OriginalRepository(storage.Repository) *gitalypb.Repository { - return m.originalRepo + fs storage.FS } func TestNewCommandCgroupStable(t *testing.T) { @@ -116,13 +110,11 @@ func TestNewCommandCgroupStable(t *testing.T) { require.NoError(t, err) defer cleanup() - originalRepo := &gitalypb.Repository{StorageName: "default", RelativePath: "some/relative/path"} locator := config.NewLocator(cfg) storagePath, err := locator.GetStorageByName(ctx, "default") require.NoError(t, err) ctx = storage.ContextWithTransaction(ctx, &mockTransaction{ - originalRepo: originalRepo, - fs: fsrecorder.NewFS(storagePath, nil), + fs: fsrecorder.NewFS(storagePath, nil), }) cmd, err := gitCmdFactory.New(ctx, repo, gitcmd.Command{ @@ -138,6 +130,6 @@ func TestNewCommandCgroupStable(t *testing.T) { require.NotNil(t, customFields) logrusFields := customFields.Fields() - require.Equal(t, originalRepo.GetStorageName()+"/"+originalRepo.GetRelativePath(), logrusFields["command.cgroup_path"]) + require.Equal(t, "default/"+repo.GetRelativePath(), logrusFields["command.cgroup_path"]) }) } diff --git a/internal/git/housekeeping/manager/optimize_repository.go b/internal/git/housekeeping/manager/optimize_repository.go index 9c652c3d822..c60906413d2 100644 --- a/internal/git/housekeeping/manager/optimize_repository.go +++ b/internal/git/housekeeping/manager/optimize_repository.go @@ -60,19 +60,11 @@ func (m *RepositoryManager) OptimizeRepository( defer span.Finish() if err := m.maybeStartTransaction(ctx, repo, func(ctx context.Context, tx storage.Transaction, repo *localrepo.Repo) error { - originalRepo := &gitalypb.Repository{ - StorageName: repo.GetStorageName(), - RelativePath: repo.GetRelativePath(), - } - if tx != nil { - originalRepo = tx.OriginalRepository(originalRepo) - } - // tryRunningHousekeeping acquires a lock on the repository to prevent other concurrent housekeeping calls on the repository. // As we may be in a transaction, the repository's relative path may have been rewritten. We use the original unrewritten relative // path here to ensure we hit the same key regardless if we run in different transactions where the snapshot prefixes in the // relative paths may differ. - ok, cleanup := m.repositoryStates.tryRunningHousekeeping(originalRepo) + ok, cleanup := m.repositoryStates.tryRunningHousekeeping(repo) // If we didn't succeed to set the state to "running" because of a concurrent housekeeping run // we exit early. if !ok { @@ -109,14 +101,6 @@ func (m *RepositoryManager) maybeStartTransaction(ctx context.Context, repo *loc } func (m *RepositoryManager) runInTransaction(ctx context.Context, transactionName string, readOnly bool, repo *localrepo.Repo, run func(context.Context, storage.Transaction, *localrepo.Repo) error) (returnedErr error) { - originalRepo := &gitalypb.Repository{ - StorageName: repo.GetStorageName(), - RelativePath: repo.GetRelativePath(), - } - if tx := storage.ExtractTransaction(ctx); tx != nil { - originalRepo = tx.OriginalRepository(originalRepo) - } - storageHandle, err := m.node.GetStorage(repo.GetStorageName()) if err != nil { return fmt.Errorf("get storage: %w", err) @@ -124,7 +108,7 @@ func (m *RepositoryManager) runInTransaction(ctx context.Context, transactionNam tx, err := storageHandle.Begin(ctx, storage.TransactionOptions{ ReadOnly: readOnly, - RelativePath: originalRepo.GetRelativePath(), + RelativePath: repo.GetRelativePath(), }) if err != nil { return fmt.Errorf("begin: %w", err) @@ -133,7 +117,12 @@ func (m *RepositoryManager) runInTransaction(ctx context.Context, transactionNam if err := run( storage.ContextWithTransaction(ctx, tx), tx, - localrepo.NewFrom(repo, tx.RewriteRepository(originalRepo)), + localrepo.NewFrom(repo, tx.RewriteRepository(&gitalypb.Repository{ + StorageName: repo.GetStorageName(), + GlRepository: repo.GetGlRepository(), + GlProjectPath: repo.GetGlProjectPath(), + RelativePath: repo.GetRelativePath(), + })), ); err != nil { if rollbackErr := tx.Rollback(ctx); rollbackErr != nil { err = errors.Join(err, fmt.Errorf("rollback: %w", rollbackErr)) diff --git a/internal/git/housekeeping/manager/optimize_repository_test.go b/internal/git/housekeeping/manager/optimize_repository_test.go index 0baedc20c43..90c57c582c3 100644 --- a/internal/git/housekeeping/manager/optimize_repository_test.go +++ b/internal/git/housekeeping/manager/optimize_repository_test.go @@ -32,7 +32,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/txinfo" - "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc/peer" ) @@ -1210,13 +1209,6 @@ func TestOptimizeRepository_ConcurrencyLimit(t *testing.T) { manager := New(gitalycfgprom.Config{}, testhelper.SharedLogger(t), nil, node) manager.optimizeFunc = func(ctx context.Context, repo *localrepo.Repo, _ housekeeping.OptimizationStrategy) error { relativePath := repo.GetRelativePath() - if tx := storage.ExtractTransaction(ctx); tx != nil { - relativePath = tx.OriginalRepository(&gitalypb.Repository{ - StorageName: repo.GetStorageName(), - RelativePath: repo.GetRelativePath(), - }).GetRelativePath() - } - reposOptimized[relativePath] = struct{}{} if relativePath == repoFirst.GetRelativePath() { diff --git a/internal/gitaly/hook/postreceive.go b/internal/gitaly/hook/postreceive.go index 85eea3935b8..6ed71f8fc08 100644 --- a/internal/gitaly/hook/postreceive.go +++ b/internal/gitaly/hook/postreceive.go @@ -130,9 +130,6 @@ func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb. if err != nil { return fmt.Errorf("get transaction: %w", err) } - - originalRepo := tx.OriginalRepository(repo) - // The transaction may already be committed if the RPC invokes git-receive-pack(1) with the // proc-receive hook enabled. Ignore the error indicating that here. if commitLSN, err := tx.Commit(ctx); err != nil { @@ -145,14 +142,14 @@ func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb. storage.LogTransactionCommit(ctx, m.logger.WithFields(payload.LogFields), commitLSN, "post-receive") } - storageHandle, err := m.node.GetStorage(originalRepo.GetStorageName()) + storageHandle, err := m.node.GetStorage(repo.GetStorageName()) if err != nil { return fmt.Errorf("get storage: %w", err) } tx, err = storageHandle.Begin(ctx, storage.TransactionOptions{ ReadOnly: true, - RelativePath: originalRepo.GetRelativePath(), + RelativePath: repo.GetRelativePath(), }) // A new transaction is created and it should be put in the context to replace (or hide) the old closed // one, so that `postReceiveHook` in the following logic can work on the correct transaction. @@ -166,8 +163,7 @@ func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb. m.logger.WithError(err).Error("failed committing post-receive transaction") } }() - - repo = tx.RewriteRepository(originalRepo) + repo = tx.RewriteRepository(repo) } changes, err := io.ReadAll(stdin) diff --git a/internal/gitaly/hook/prereceive.go b/internal/gitaly/hook/prereceive.go index bf638e7b780..de84d775fdf 100644 --- a/internal/gitaly/hook/prereceive.go +++ b/internal/gitaly/hook/prereceive.go @@ -127,9 +127,12 @@ func (m *GitLabHookManager) preReceiveHook(ctx context.Context, payload gitcmd.H return structerr.NewInternal("protocol not set") } + rootStorage, _ := m.locator.GetRootStoragePathByName(repo.GetStorageName()) + relativePathToRoot, _ := filepath.Rel(rootStorage, repoPath) + params := gitlab.AllowedParams{ RepoPath: repoPath, - RelativePath: repo.GetRelativePath(), + RelativePath: relativePathToRoot, GitObjectDirectory: repo.GetGitObjectDirectory(), GitAlternateObjectDirectories: repo.GetGitAlternateObjectDirectories(), GLRepository: repo.GetGlRepository(), diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 9672890ab92..4c30a8cf4b4 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -117,16 +117,8 @@ func (s *server) packObjectsHook(ctx context.Context, req *gitalypb.PackObjectsH func (s *server) computeCacheKey(ctx context.Context, req *gitalypb.PackObjectsHookWithSidechannelRequest, stdinReader io.Reader) (string, io.ReadCloser, error) { cacheHash := sha256.New() - repository := req.GetRepository() - if tx := storage.ExtractTransaction(ctx); tx != nil { - // The cache uses the requests as the keys. As the request's repository in the RPC handler has been rewritten - // to point to the transaction's repository, the handler sees each request as different even if they point to - // the same repository. Restore the original request to ensure identical requests get the same key. - repository = tx.OriginalRepository(req.GetRepository()) - } - cacheKeyPrefix, err := protojson.Marshal(&gitalypb.PackObjectsHookWithSidechannelRequest{ - Repository: repository, + Repository: req.GetRepository(), Args: req.GetArgs(), GitProtocol: req.GetGitProtocol(), }) diff --git a/internal/gitaly/service/objectpool/create.go b/internal/gitaly/service/objectpool/create.go index bcec11a9a86..873f2dbfb22 100644 --- a/internal/gitaly/service/objectpool/create.go +++ b/internal/gitaly/service/objectpool/create.go @@ -63,7 +63,7 @@ func (s *server) CreateObjectPool(ctx context.Context, in *gitalypb.CreateObject if tx := storage.ExtractTransaction(ctx); tx != nil { if err := s.migrationStateManager.RecordKeyCreation( tx, - tx.OriginalRepository(poolRepo).GetRelativePath(), + poolRepo.GetRelativePath(), ); err != nil { return nil, structerr.NewInternal("recording migration key: %w", err) } diff --git a/internal/gitaly/service/objectpool/delete.go b/internal/gitaly/service/objectpool/delete.go index e7bc8b31b1c..4614eadc32b 100644 --- a/internal/gitaly/service/objectpool/delete.go +++ b/internal/gitaly/service/objectpool/delete.go @@ -30,7 +30,7 @@ func (s *server) DeleteObjectPool(ctx context.Context, in *gitalypb.DeleteObject if tx := storage.ExtractTransaction(ctx); tx != nil { poolRepo := in.GetObjectPool().GetRepository() - if err := s.migrationStateManager.RecordKeyDeletion(tx, tx.OriginalRepository(poolRepo).GetRelativePath()); err != nil { + if err := s.migrationStateManager.RecordKeyDeletion(tx, poolRepo.GetRelativePath()); err != nil { return nil, structerr.NewInternal("recording migration key: %w", err) } } diff --git a/internal/gitaly/service/objectpool/fetch_into_object_pool.go b/internal/gitaly/service/objectpool/fetch_into_object_pool.go index 1c914c84990..b30aea7cd9c 100644 --- a/internal/gitaly/service/objectpool/fetch_into_object_pool.go +++ b/internal/gitaly/service/objectpool/fetch_into_object_pool.go @@ -33,10 +33,10 @@ func (s *server) FetchIntoObjectPool(ctx context.Context, req *gitalypb.FetchInt originalPoolRepo := objectPool.Repo if tx := storage.ExtractTransaction(ctx); tx != nil { - originalPoolRepo = s.localRepoFactory.Build(tx.OriginalRepository(&gitalypb.Repository{ + originalPoolRepo = s.localRepoFactory.Build(&gitalypb.Repository{ StorageName: req.GetObjectPool().GetRepository().GetStorageName(), RelativePath: req.GetObjectPool().GetRepository().GetRelativePath(), - })) + }) } // When transactions are enabled, housekeeping tasks are scheduled on the transaction (by operations diff --git a/internal/gitaly/service/objectpool/get.go b/internal/gitaly/service/objectpool/get.go index 4ae326fffc7..ed086579f89 100644 --- a/internal/gitaly/service/objectpool/get.go +++ b/internal/gitaly/service/objectpool/get.go @@ -5,7 +5,6 @@ import ( "errors" "gitlab.com/gitlab-org/gitaly/v16/internal/git/objectpool" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -31,11 +30,6 @@ func (s *server) GetObjectPool(ctx context.Context, in *gitalypb.GetObjectPoolRe } objectPoolProto := objectPool.ToProto() - if tx := storage.ExtractTransaction(ctx); tx != nil { - // The object pool's relative path is pointing to the transaction's snapshot. Return - // the original relative path in the response. - objectPoolProto.Repository = tx.OriginalRepository(objectPoolProto.GetRepository()) - } return &gitalypb.GetObjectPoolResponse{ ObjectPool: objectPoolProto, diff --git a/internal/gitaly/service/repository/create_fork.go b/internal/gitaly/service/repository/create_fork.go index 4ae75d73d92..f587ba48c42 100644 --- a/internal/gitaly/service/repository/create_fork.go +++ b/internal/gitaly/service/repository/create_fork.go @@ -111,7 +111,7 @@ func (s *server) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest if tx := storage.ExtractTransaction(ctx); tx != nil { if err := s.migrationStateManager.RecordKeyCreation( tx, - tx.OriginalRepository(targetRepository).GetRelativePath(), + targetRepository.GetRelativePath(), ); err != nil { return nil, structerr.NewInternal("recording migration key: %w", err) } diff --git a/internal/gitaly/service/repository/create_repository.go b/internal/gitaly/service/repository/create_repository.go index 896dfed0773..50917930135 100644 --- a/internal/gitaly/service/repository/create_repository.go +++ b/internal/gitaly/service/repository/create_repository.go @@ -44,7 +44,7 @@ func (s *server) CreateRepository(ctx context.Context, req *gitalypb.CreateRepos if tx := storage.ExtractTransaction(ctx); tx != nil { if err := s.migrationStateManager.RecordKeyCreation( tx, - tx.OriginalRepository(repository).GetRelativePath(), + repository.GetRelativePath(), ); err != nil { return nil, structerr.NewInternal("recording migration key: %w", err) } diff --git a/internal/gitaly/service/repository/create_repository_from_bundle.go b/internal/gitaly/service/repository/create_repository_from_bundle.go index 0306d13d6c7..e7617fcf18b 100644 --- a/internal/gitaly/service/repository/create_repository_from_bundle.go +++ b/internal/gitaly/service/repository/create_repository_from_bundle.go @@ -47,7 +47,7 @@ func (s *server) CreateRepositoryFromBundle(stream gitalypb.RepositoryService_Cr if tx := storage.ExtractTransaction(ctx); tx != nil { if err := s.migrationStateManager.RecordKeyCreation( tx, - tx.OriginalRepository(repo).GetRelativePath(), + repo.GetRelativePath(), ); err != nil { return structerr.NewInternal("recording migration key: %w", err) } diff --git a/internal/gitaly/service/repository/create_repository_from_snapshot.go b/internal/gitaly/service/repository/create_repository_from_snapshot.go index ae6f56a2409..3fece7d970a 100644 --- a/internal/gitaly/service/repository/create_repository_from_snapshot.go +++ b/internal/gitaly/service/repository/create_repository_from_snapshot.go @@ -151,7 +151,7 @@ func (s *server) CreateRepositoryFromSnapshot(ctx context.Context, in *gitalypb. if tx := storage.ExtractTransaction(ctx); tx != nil { if err := s.migrationStateManager.RecordKeyCreation( tx, - tx.OriginalRepository(repository).GetRelativePath(), + repository.GetRelativePath(), ); err != nil { return nil, structerr.NewInternal("recording migration key: %w", err) } diff --git a/internal/gitaly/service/repository/create_repository_from_url.go b/internal/gitaly/service/repository/create_repository_from_url.go index efa967ade9d..22ec67b3e58 100644 --- a/internal/gitaly/service/repository/create_repository_from_url.go +++ b/internal/gitaly/service/repository/create_repository_from_url.go @@ -132,7 +132,7 @@ func (s *server) CreateRepositoryFromURL(ctx context.Context, req *gitalypb.Crea if tx := storage.ExtractTransaction(ctx); tx != nil { if err := s.migrationStateManager.RecordKeyCreation( tx, - tx.OriginalRepository(req.GetRepository()).GetRelativePath(), + req.GetRepository().GetRelativePath(), ); err != nil { return nil, structerr.NewInternal("recording migration key: %w", err) } diff --git a/internal/gitaly/service/repository/migrate_reference_backend.go b/internal/gitaly/service/repository/migrate_reference_backend.go index 9fe3bcbb2e4..9ae43c6206e 100644 --- a/internal/gitaly/service/repository/migrate_reference_backend.go +++ b/internal/gitaly/service/repository/migrate_reference_backend.go @@ -43,7 +43,7 @@ func (s *server) MigrateReferenceBackend( if err := migrator.Fn(ctx, tx, in.GetRepository().GetStorageName(), - tx.OriginalRepository(in.GetRepository()).GetRelativePath(), + in.GetRepository().GetRelativePath(), ); err != nil { return nil, structerr.NewInternal("migration failed: %w", err) } diff --git a/internal/gitaly/service/repository/remove.go b/internal/gitaly/service/repository/remove.go index 89fa1081cd7..f4675374f48 100644 --- a/internal/gitaly/service/repository/remove.go +++ b/internal/gitaly/service/repository/remove.go @@ -20,7 +20,7 @@ func (s *server) RemoveRepository(ctx context.Context, in *gitalypb.RemoveReposi } if tx := storage.ExtractTransaction(ctx); tx != nil { - if err := s.migrationStateManager.RecordKeyDeletion(tx, tx.OriginalRepository(repository).GetRelativePath()); err != nil { + if err := s.migrationStateManager.RecordKeyDeletion(tx, repository.GetRelativePath()); err != nil { return nil, structerr.NewInternal("recording migration key: %w", err) } } diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index 15be6138896..cdf92cfcd9c 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -143,7 +143,7 @@ func (s *server) ReplicateRepository(ctx context.Context, in *gitalypb.Replicate if err := migrator.Fn(ctx, tx, in.GetRepository().GetStorageName(), - tx.OriginalRepository(in.GetRepository()).GetRelativePath(), + in.GetRepository().GetRelativePath(), ); err != nil { return nil, structerr.NewInternal("migration failed: %w", err) } @@ -283,7 +283,7 @@ func (s *server) createFromSnapshot( if tx := storage.ExtractTransaction(ctx); tx != nil { if err := s.migrationStateManager.RecordKeyCreation( tx, - tx.OriginalRepository(target).GetRelativePath(), + target.GetRelativePath(), ); err != nil { return fmt.Errorf("recording migration key: %w", err) } diff --git a/internal/gitaly/service/smarthttp/cache.go b/internal/gitaly/service/smarthttp/cache.go index 154261bbc4d..2ce63f63672 100644 --- a/internal/gitaly/service/smarthttp/cache.go +++ b/internal/gitaly/service/smarthttp/cache.go @@ -9,11 +9,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "gitlab.com/gitlab-org/gitaly/v16/internal/cache" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" - "google.golang.org/protobuf/proto" ) type infoRefCache struct { @@ -60,14 +58,6 @@ func (c infoRefCache) tryCache(ctx context.Context, in *gitalypb.InfoRefsRequest c.logger.DebugContext(ctx, "Attempting to fetch cached response") countAttempt() - if tx := storage.ExtractTransaction(ctx); tx != nil { - // The cache uses the requests as the keys. As the request's repository in the RPC handler has been rewritten - // to point to the transaction's repository, the handler sees each request as different even if they point to - // the same repository. Restore the original request to ensure identical requests get the same key. - in = proto.Clone(in).(*gitalypb.InfoRefsRequest) - in.Repository = tx.OriginalRepository(in.GetRepository()) - } - stream, err := c.streamer.GetStream(ctx, in.GetRepository(), in) switch { case err == nil: diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index d915c8654d1..1bda040e519 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -125,8 +125,6 @@ type Transaction interface { WriteCommitGraphs(housekeepingcfg.WriteCommitGraphConfig) // RewriteRepository rewrites the repository to point to the transaction's snapshot. RewriteRepository(*gitalypb.Repository) *gitalypb.Repository - // OriginalRepository returns the repository as it was before rewriting it to point to the snapshot. - OriginalRepository(Repository) *gitalypb.Repository // PartitionRelativePaths returns all known repository relative paths for // the transactions partition. PartitionRelativePaths() []string diff --git a/internal/gitaly/storage/storagemgr/middleware_test.go b/internal/gitaly/storage/storagemgr/middleware_test.go index 75359d583fa..36861f626cc 100644 --- a/internal/gitaly/storage/storagemgr/middleware_test.go +++ b/internal/gitaly/storage/storagemgr/middleware_test.go @@ -353,7 +353,7 @@ messages and behavior by erroring out the requests before they even hit this int expected := validAdditionalRepository() require.Equal(t, expected.GetRelativePath(), actual.GetRelativePath()) // But the restored non-snapshotted repository should match the original. - testhelper.ProtoEqual(t, expected, storage.ExtractTransaction(ctx).OriginalRepository(actual)) + testhelper.ProtoEqual(t, expected, actual) }, expectHandlerInvoked: true, }, @@ -385,7 +385,7 @@ messages and behavior by erroring out the requests before they even hit this int expected := validAdditionalRepository() require.Equal(t, expected.GetRelativePath(), actual.GetRelativePath()) // But the restored non-snapshotted repository should match the original. - testhelper.ProtoEqual(t, expected, storage.ExtractTransaction(ctx).OriginalRepository(actual)) + testhelper.ProtoEqual(t, expected, actual) }, expectHandlerInvoked: true, }, diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go index d9acbfb06b4..01898b2e3cb 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go @@ -37,8 +37,6 @@ func NewUnaryInterceptor(logger log.Logger, registry *protoregistry.Registry, re return nil, fmt.Errorf("extract repository: %w", err) } - targetRepo = tx.OriginalRepository(targetRepo) - switch methodInfo.Operation { case protoregistry.OpAccessor: register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) @@ -88,8 +86,6 @@ func NewStreamInterceptor(logger log.Logger, registry *protoregistry.Registry, r return fmt.Errorf("extract repository: %w", err) } - targetRepo = tx.OriginalRepository(targetRepo) - switch methodInfo.Operation { case protoregistry.OpAccessor: register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 9e57041873f..59df0bae8d4 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -524,21 +524,6 @@ func (txn *Transaction) RewriteRepository(repo *gitalypb.Repository) *gitalypb.R return rewritten } -// OriginalRepository returns the repository as it was before rewriting it to point to the snapshot. -func (txn *Transaction) OriginalRepository(repo storage.Repository) *gitalypb.Repository { - original := &gitalypb.Repository{ - StorageName: repo.GetStorageName(), - GlRepository: repo.GetGlRepository(), - GlProjectPath: repo.GetGlProjectPath(), - } - - original.RelativePath = strings.TrimPrefix(repo.GetRelativePath(), txn.snapshot.Prefix()+string(os.PathSeparator)) - original.GitObjectDirectory = "" - original.GitAlternateObjectDirectories = nil - - return original -} - func (txn *Transaction) updateState(newState transactionState) error { txn.stateLatch.Lock() defer txn.stateLatch.Unlock() diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go index 6dedeeb9732..4c2d0f4e8fa 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping.go @@ -848,21 +848,13 @@ func (mgr *TransactionManager) prepareOffloading(ctx context.Context, transactio // workingRepoPath is the current repository path which we are performing operations on. // In the context of transaction, workingRepoPath is a snapshot repository. workingRepoPath := mgr.getAbsolutePath(ctx, workingRepository.GetRelativePath()) - // Find the original repository's absolute path. In the context of transaction, originalRepo is the repo - // which we are taking a snapshot of. - originalRepo := &gitalypb.Repository{ - StorageName: workingRepository.GetStorageName(), - RelativePath: workingRepository.GetRelativePath(), - } - originalRepo = transaction.OriginalRepository(originalRepo) - // originalRepoAbsPath := mgr.getAbsolutePath(originalRepo.GetRelativePath()) // cfg.Prefix should be empty in production, which triggers automatic UUID generation. // Non-empty prefix values are only used in test environments. if cfg.Prefix == "" { offloadingID := uuid.New().String() // When uploading to offloading storage, use [original repo's relative path + UUID] as prefix - cfg.Prefix = filepath.Join(originalRepo.GetRelativePath(), offloadingID) + cfg.Prefix = filepath.Join(workingRepository.GetRelativePath(), offloadingID) } // Capture the list of pack-files before repacking. diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index bc85d9aa9ec..eea1deb9ceb 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1062,17 +1062,16 @@ func runMockMaintenanceServer(t *testing.T, cfg gconfig.Cfg) (*mockMaintenanceSe } func (m *mockMaintenanceServer) OptimizeRepository(ctx context.Context, in *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { - if tx := storage.ExtractTransaction(ctx); tx != nil { - in.Repository = tx.OriginalRepository(in.GetRepository()) - } - m.requestCh <- in return &gitalypb.OptimizeRepositoryResponse{}, nil } func (m *mockMaintenanceServer) PruneUnreachableObjects(ctx context.Context, in *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) { if tx := storage.ExtractTransaction(ctx); tx != nil { - in.Repository = tx.OriginalRepository(in.GetRepository()) + in.Repository = &gitalypb.Repository{ + StorageName: in.GetRepository().GetStorageName(), + RelativePath: in.GetRepository().GetRelativePath(), + } } m.requestCh <- in -- GitLab