From 9ebb3c1ccf2822f93e320eee4efe9fe423a01b24 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Wed, 30 Jul 2025 13:17:43 -0400 Subject: [PATCH] migration: Reset migration state when there are disabled transactions Currently, each repository's migration run is assigned a state with a `doneCh` channel. When a migration is registered but disabled, its state is initialized with a closed `doneCh`. As a result, the migration framework treats it as completed, even though it was never actually executed. A temporary workaround is to restart Gitaly, which resets the internal migration state and allows the migration to run. This commit fixes the issue by removing the repository from the migration state when any disabled migrations are detected. This ensures the migration can be picked up once it's enabled. We also add logic to pre-check which migrations need to run before starting a transaction, since transactions with snapshotting can be expensive. --- .../storagemgr/partition/migration/manager.go | 83 ++++-- .../partition/migration/manager_test.go | 244 +++++++++++++++++- 2 files changed, 295 insertions(+), 32 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager.go b/internal/gitaly/storage/storagemgr/partition/migration/manager.go index c5d1c0692ac..eb83097e029 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager.go @@ -120,7 +120,18 @@ func (m *migrationManager) migrate(ctx context.Context, opts storage.BeginOption mCtx = metadata.NewIncomingContext(mCtx, md) } - if err := m.performMigrations(mCtx, opts); err != nil { + shouldResetState, err := m.performMigrations(mCtx, opts) + defer func() { + if shouldResetState { + // Put state reset logic in a defer block because we want other transaction blocks to + // wait while this transaction runs the migration tasks. + // Resetting the state too early may cause other transactions to try to run migrations concurrently. + m.mu.Lock() + delete(m.migrationStates, relativePath) + m.mu.Unlock() + } + }() + if err != nil { // Record the error as part of the migration state so concurrent transactions are notified. state.err = err return fmt.Errorf("performing migrations: %w", err) @@ -130,25 +141,52 @@ func (m *migrationManager) migrate(ctx context.Context, opts storage.BeginOption } // performMigrations performs any missing migrations on a repository. -func (m *migrationManager) performMigrations(ctx context.Context, opts storage.BeginOptions) (returnedErr error) { +func (m *migrationManager) performMigrations(ctx context.Context, opts storage.BeginOptions) (shouldResetState bool, returnedErr error) { relativePath := opts.RelativePaths[0] id, err := m.getLastMigrationID(ctx, relativePath) if errors.Is(err, storage.ErrRepositoryNotFound) { // If the repository is not found pretend the repository is up-to-date with migrations and // let the downstream transaction set the migration key during repository creation. - return nil + return shouldResetState, nil } else if err != nil { - return fmt.Errorf("getting last migration: %w", err) + return shouldResetState, fmt.Errorf("getting last migration: %w", err) } // If the repository is already up-to-date, there is no need to start a transaction and perform // migrations. maxID := (*m.migrations)[len(*m.migrations)-1].ID if id == maxID { - return nil + return shouldResetState, nil } else if id > maxID { - return fmt.Errorf("repository has invalid migration key: %d", id) + return shouldResetState, fmt.Errorf("repository has invalid migration key: %d", id) + } + + // Check which migrations need to run, we do it here to avoid starting transaction too early. Transaction + // can be expensive with snapshotting. We can do a pre-check, eliminate disabled migrations. If there is + // no migration to run, just return. + migrationsToRun := make([]Migration, 0, len(*m.migrations)) + for _, migration := range *m.migrations { + if id >= migration.ID { + continue + } + + if migration.IsDisabled != nil && migration.IsDisabled(ctx) { + // A migration may have configuration allowing it to be disabled. As migrations are + // performed in order, if a disabled migration is encountered, the remaining migrations are + // also not executed. Since repository migrations are currently only attempted once for a + // repository during the partition lifetime, a previously disabled migration may not + // immediately be executed in the next transaction. Migration state must first be reset. + // To reset migration state, we remove the repo's relative path from state. + // Thus, we make the migrations on this repo "retriable" and they will be checked again in the next transaction run. + shouldResetState = true + break + } + migrationsToRun = append(migrationsToRun, migration) + } + if len(migrationsToRun) == 0 { + m.logger.Info("no migrations to run") + return shouldResetState, nil } // Start a single transaction that records all outstanding migrations that get executed. @@ -158,7 +196,7 @@ func (m *migrationManager) performMigrations(ctx context.Context, opts storage.B SkipPreventingReftableCompaction: true, }) if err != nil { - return fmt.Errorf("begin migration update: %w", err) + return shouldResetState, fmt.Errorf("begin migration update: %w", err) } defer func() { if returnedErr != nil { @@ -168,40 +206,29 @@ func (m *migrationManager) performMigrations(ctx context.Context, opts storage.B } }() - for _, migration := range *m.migrations { + for _, migration := range migrationsToRun { timer := prometheus.NewTimer(m.metrics.latencyMetric.With(prometheus.Labels{ "migration_name": migration.Name, })) - if id >= migration.ID { - continue - } - logger := m.logger.WithFields(log.Fields{ - "migration_name": migration.Name, - "migration_id": migration.ID, - "relative_path": relativePath, + "migration_name": migration.Name, + "migration_id": migration.ID, + "relative_path": relativePath, + "current_migration_id": id, + "max_migration_id": maxID, }) - // A migration may have configuration allowing it to be disabled. As migrations are - // performed in order, if a disabled migration is encountered, the remaining migrations are - // also not executed. Since repository migrations are currently only attempted once for a - // repository during the partition lifetime, a previously disabled migration may not - // immediately be executed in the next transaction. Migration state must first be reset. - if migration.IsDisabled != nil && migration.IsDisabled(ctx) { - break - } - logger.Info("running migration") if err := migration.run(ctx, txn, m.storageName, relativePath); err != nil { - return fmt.Errorf("run migration: %w", err) + return shouldResetState, fmt.Errorf("run migration: %w", err) } // If migration operations are successfully recorded, the last run migration ID is also recorded // signifying it has been completed. if err := migration.recordID(txn, relativePath); err != nil { - return fmt.Errorf("setting migration key: %w", err) + return shouldResetState, fmt.Errorf("setting migration key: %w", err) } duration := timer.ObserveDuration() @@ -210,12 +237,12 @@ func (m *migrationManager) performMigrations(ctx context.Context, opts storage.B commitLSN, err := txn.Commit(ctx) if err != nil { - return fmt.Errorf("commit migration update: %w", err) + return shouldResetState, fmt.Errorf("commit migration update: %w", err) } storage.LogTransactionCommit(ctx, m.logger, commitLSN, "migrator") - return nil + return shouldResetState, nil } // getLastMigrationID returns the ID of the last executed migration for a repository. diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go index a52ef5ed0d5..58638f08d73 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go @@ -113,10 +113,25 @@ func TestMigrationManager_Begin(t *testing.T) { expectedLastID: 3, }, { - desc: "disabled migration", - migrations: []Migration{migrationFn(1), {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, migrationFn(3)}, - startingMigration: &Migration{ID: 0}, - expectedState: &migrationState{}, + desc: "all disabled migrations", + migrations: []Migration{ + {ID: 1, IsDisabled: disabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, + }, + startingMigration: &Migration{ID: 0}, + // We have disabled migration, so the migrationManager should reset its state. + // Thus, we should not see any state by the repo's relative path. + expectedState: nil, + expectedMigrationIDs: nil, + expectedLastID: 0, + }, + { + desc: "enabled migration followed by disabled ones", + migrations: []Migration{migrationFn(1), {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, migrationFn(3)}, + startingMigration: &Migration{ID: 0}, + // We have disabled migration, so the migrationManager should reset its state. + // Thus, we should not see any state by the repo's relative path. + expectedState: nil, expectedMigrationIDs: map[uint64]struct{}{1: {}}, expectedLastID: 1, }, @@ -482,6 +497,227 @@ func TestMigrationManager_Context(t *testing.T) { require.True(t, called) } +func TestMigrationManager_Disable_Then_Enable_Migrations(t *testing.T) { + // The test cases in this test are written in sequence on purpose to mimic the situation when + // transaction migration tasks are enabled in sequence. + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + disabledFn := func(context.Context) bool { return true } + enabledFn := func(context.Context) bool { return false } + md := metadata.MD{"foo": []string{"bar"}} + + recordingFn := func(id uint64) func(context.Context, storage.Transaction, string, string) error { + return func(ctx context.Context, txn storage.Transaction, _ string, _ string) error { + // Ensure that the context is carrying over metadata + // from the request's context. + actualMD, _ := metadata.FromIncomingContext(ctx) + assert.Equal(t, md, actualMD) + + return txn.KV().Set(uint64ToBytes(id), nil) + } + } + + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + relativePath := repo.GetRelativePath() + + testPartitionID := storage.PartitionID(1) + logger := testhelper.NewLogger(t) + + storageName := cfg.Storages[0].Name + storagePath := cfg.Storages[0].Path + + dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) + require.NoError(t, err) + defer dbMgr.Close() + + database, err := dbMgr.GetDB(storageName) + require.NoError(t, err) + defer testhelper.MustClose(t, database) + + stateDir := filepath.Join(storagePath, "state") + require.NoError(t, os.MkdirAll(stateDir, mode.Directory)) + + stagingDir := filepath.Join(storagePath, "staging") + require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) + + cmdFactory := gittest.NewCommandFactory(t, cfg) + cache := catfile.NewCache(cfg) + defer cache.Stop() + + repositoryFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, cache) + + m := partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)) + raftNode, err := raftmgr.NewNode(cfg, logger, dbMgr, nil) + require.NoError(t, err) + + raftFactory := raftmgr.DefaultFactoryWithNode(cfg.Raft, raftNode) + + partitionFactoryOptions := []partition.FactoryOption{ + partition.WithCmdFactory(cmdFactory), + partition.WithRepoFactory(repositoryFactory), + partition.WithMetrics(m), + partition.WithRaftConfig(cfg.Raft), + partition.WithRaftFactory(raftFactory), + } + factory := partition.NewFactory(partitionFactoryOptions...) + tm := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir) + + ctx, cancel := context.WithCancel(ctx) + + // loadedMigrations will be point to different migration task group to mimic the task enabling in sequence + var loadedMigrations []Migration + mm := migrationManager{ + ctx: ctx, + cancelFn: cancel, + Partition: tm, + logger: logger, + migrations: &loadedMigrations, + metrics: NewMetrics(), + migrationStates: map[string]*migrationState{}, + } + + managerErr := make(chan error) + go func() { + managerErr <- tm.Run() + }() + + t.Cleanup(func() { + tm.Close() + require.NoError(t, tm.CloseSnapshots()) + require.NoError(t, <-managerErr) + }) + + for _, seqTestCases := range []struct { + desc string + migrations []Migration + expectedState *migrationState + expectedMigrationIDs map[uint64]struct{} + expectedErr error + expectedLastID uint64 + }{ + { + desc: "step 1 with four disabled migrations", + migrations: []Migration{ + {ID: 1, IsDisabled: disabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: disabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: disabledFn, Fn: recordingFn(4)}, + }, + expectedState: nil, + expectedMigrationIDs: nil, + expectedLastID: 0, + }, + { + desc: "step 2 with the first migration enabled", + migrations: []Migration{ + {ID: 1, IsDisabled: enabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: disabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: disabledFn, Fn: recordingFn(4)}, + }, + expectedState: nil, + expectedMigrationIDs: map[uint64]struct{}{1: {}}, + expectedLastID: 1, // first migration is applied, but second is not + }, + { + desc: "step 3 with the last migration enabled", + migrations: []Migration{ + {ID: 1, IsDisabled: enabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: disabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: enabledFn, Fn: recordingFn(4)}, + }, + expectedState: nil, + expectedMigrationIDs: map[uint64]struct{}{1: {}}, + expectedLastID: 1, // first migration is applied, but second is not + }, + { + desc: "step 4 with the second migration enabled", + migrations: []Migration{ + {ID: 1, IsDisabled: enabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: enabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: disabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: enabledFn, Fn: recordingFn(4)}, + }, + expectedState: nil, + expectedMigrationIDs: map[uint64]struct{}{1: {}, 2: {}}, + expectedLastID: 2, + }, + { + desc: "step 5 with the all migration enabled", + migrations: []Migration{ + {ID: 1, IsDisabled: enabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: enabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: enabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: enabledFn, Fn: recordingFn(4)}, + }, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{1: {}, 2: {}, 3: {}, 4: {}}, + expectedLastID: 4, + }, + } { + t.Run(seqTestCases.desc, func(t *testing.T) { + loadedMigrations = seqTestCases.migrations + txn, err := tm.Begin(ctx, storage.BeginOptions{ + Write: true, + RelativePaths: []string{relativePath}, + }) + require.NoError(t, err) + + _, err = txn.Commit(ctx) + require.NoError(t, err) + + ctx = metadata.NewIncomingContext(ctx, md) + + // Begin and commit transaction through the migration manager to exercise the migration logic. + if txn, err := mm.Begin(ctx, storage.BeginOptions{ + Write: false, + RelativePaths: []string{relativePath}, + }); err != nil { + require.ErrorContains(t, err, seqTestCases.expectedErr.Error()) + } else { + require.NoError(t, err) + + // In this test, each executed migration records its ID in the KV store. Validate + // that the expected migrations were performed. + for _, m := range seqTestCases.migrations { + _, expected := seqTestCases.expectedMigrationIDs[m.ID] + if _, err := txn.KV().Get(uint64ToBytes(m.ID)); err != nil { + require.ErrorIs(t, err, badger.ErrKeyNotFound) + require.False(t, expected) + } else { + require.NoError(t, err) + require.True(t, expected) + } + } + + _, err := txn.Commit(ctx) + require.NoError(t, err) + } + + if state, ok := mm.migrationStates[relativePath]; ok { + require.NotNil(t, seqTestCases.expectedState) + if seqTestCases.expectedState.err != nil { + require.ErrorContains(t, state.err, seqTestCases.expectedState.err.Error()) + } else { + require.NoError(t, state.err) + } + } else { + require.Nil(t, seqTestCases.expectedState) + require.Empty(t, mm.migrationStates) + } + + id, err := mm.getLastMigrationID(ctx, repo.GetRelativePath()) + require.NoError(t, err) + require.Equal(t, seqTestCases.expectedLastID, id) + }) + } +} + type mockPartition struct { storagemgr.Partition beginFn func(context.Context, storage.BeginOptions) (storage.Transaction, error) -- GitLab