diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager.go b/internal/gitaly/storage/storagemgr/partition/migration/manager.go index c5d1c0692ac13f66412c62196bab3740cc8e3ac2..eb83097e029c696ab6bc1c96ef7b82de7ebd0245 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager.go @@ -120,7 +120,18 @@ func (m *migrationManager) migrate(ctx context.Context, opts storage.BeginOption mCtx = metadata.NewIncomingContext(mCtx, md) } - if err := m.performMigrations(mCtx, opts); err != nil { + shouldResetState, err := m.performMigrations(mCtx, opts) + defer func() { + if shouldResetState { + // Put state reset logic in a defer block because we want other transaction blocks to + // wait while this transaction runs the migration tasks. + // Resetting the state too early may cause other transactions to try to run migrations concurrently. + m.mu.Lock() + delete(m.migrationStates, relativePath) + m.mu.Unlock() + } + }() + if err != nil { // Record the error as part of the migration state so concurrent transactions are notified. state.err = err return fmt.Errorf("performing migrations: %w", err) @@ -130,25 +141,52 @@ func (m *migrationManager) migrate(ctx context.Context, opts storage.BeginOption } // performMigrations performs any missing migrations on a repository. -func (m *migrationManager) performMigrations(ctx context.Context, opts storage.BeginOptions) (returnedErr error) { +func (m *migrationManager) performMigrations(ctx context.Context, opts storage.BeginOptions) (shouldResetState bool, returnedErr error) { relativePath := opts.RelativePaths[0] id, err := m.getLastMigrationID(ctx, relativePath) if errors.Is(err, storage.ErrRepositoryNotFound) { // If the repository is not found pretend the repository is up-to-date with migrations and // let the downstream transaction set the migration key during repository creation. - return nil + return shouldResetState, nil } else if err != nil { - return fmt.Errorf("getting last migration: %w", err) + return shouldResetState, fmt.Errorf("getting last migration: %w", err) } // If the repository is already up-to-date, there is no need to start a transaction and perform // migrations. maxID := (*m.migrations)[len(*m.migrations)-1].ID if id == maxID { - return nil + return shouldResetState, nil } else if id > maxID { - return fmt.Errorf("repository has invalid migration key: %d", id) + return shouldResetState, fmt.Errorf("repository has invalid migration key: %d", id) + } + + // Check which migrations need to run, we do it here to avoid starting transaction too early. Transaction + // can be expensive with snapshotting. We can do a pre-check, eliminate disabled migrations. If there is + // no migration to run, just return. + migrationsToRun := make([]Migration, 0, len(*m.migrations)) + for _, migration := range *m.migrations { + if id >= migration.ID { + continue + } + + if migration.IsDisabled != nil && migration.IsDisabled(ctx) { + // A migration may have configuration allowing it to be disabled. As migrations are + // performed in order, if a disabled migration is encountered, the remaining migrations are + // also not executed. Since repository migrations are currently only attempted once for a + // repository during the partition lifetime, a previously disabled migration may not + // immediately be executed in the next transaction. Migration state must first be reset. + // To reset migration state, we remove the repo's relative path from state. + // Thus, we make the migrations on this repo "retriable" and they will be checked again in the next transaction run. + shouldResetState = true + break + } + migrationsToRun = append(migrationsToRun, migration) + } + if len(migrationsToRun) == 0 { + m.logger.Info("no migrations to run") + return shouldResetState, nil } // Start a single transaction that records all outstanding migrations that get executed. @@ -158,7 +196,7 @@ func (m *migrationManager) performMigrations(ctx context.Context, opts storage.B SkipPreventingReftableCompaction: true, }) if err != nil { - return fmt.Errorf("begin migration update: %w", err) + return shouldResetState, fmt.Errorf("begin migration update: %w", err) } defer func() { if returnedErr != nil { @@ -168,40 +206,29 @@ func (m *migrationManager) performMigrations(ctx context.Context, opts storage.B } }() - for _, migration := range *m.migrations { + for _, migration := range migrationsToRun { timer := prometheus.NewTimer(m.metrics.latencyMetric.With(prometheus.Labels{ "migration_name": migration.Name, })) - if id >= migration.ID { - continue - } - logger := m.logger.WithFields(log.Fields{ - "migration_name": migration.Name, - "migration_id": migration.ID, - "relative_path": relativePath, + "migration_name": migration.Name, + "migration_id": migration.ID, + "relative_path": relativePath, + "current_migration_id": id, + "max_migration_id": maxID, }) - // A migration may have configuration allowing it to be disabled. As migrations are - // performed in order, if a disabled migration is encountered, the remaining migrations are - // also not executed. Since repository migrations are currently only attempted once for a - // repository during the partition lifetime, a previously disabled migration may not - // immediately be executed in the next transaction. Migration state must first be reset. - if migration.IsDisabled != nil && migration.IsDisabled(ctx) { - break - } - logger.Info("running migration") if err := migration.run(ctx, txn, m.storageName, relativePath); err != nil { - return fmt.Errorf("run migration: %w", err) + return shouldResetState, fmt.Errorf("run migration: %w", err) } // If migration operations are successfully recorded, the last run migration ID is also recorded // signifying it has been completed. if err := migration.recordID(txn, relativePath); err != nil { - return fmt.Errorf("setting migration key: %w", err) + return shouldResetState, fmt.Errorf("setting migration key: %w", err) } duration := timer.ObserveDuration() @@ -210,12 +237,12 @@ func (m *migrationManager) performMigrations(ctx context.Context, opts storage.B commitLSN, err := txn.Commit(ctx) if err != nil { - return fmt.Errorf("commit migration update: %w", err) + return shouldResetState, fmt.Errorf("commit migration update: %w", err) } storage.LogTransactionCommit(ctx, m.logger, commitLSN, "migrator") - return nil + return shouldResetState, nil } // getLastMigrationID returns the ID of the last executed migration for a repository. diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go index a52ef5ed0d5f739309e4c846c508f952262d0eaf..58638f08d736cf4d760ce2a9b08f345adc0bc4bc 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go @@ -113,10 +113,25 @@ func TestMigrationManager_Begin(t *testing.T) { expectedLastID: 3, }, { - desc: "disabled migration", - migrations: []Migration{migrationFn(1), {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, migrationFn(3)}, - startingMigration: &Migration{ID: 0}, - expectedState: &migrationState{}, + desc: "all disabled migrations", + migrations: []Migration{ + {ID: 1, IsDisabled: disabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, + }, + startingMigration: &Migration{ID: 0}, + // We have disabled migration, so the migrationManager should reset its state. + // Thus, we should not see any state by the repo's relative path. + expectedState: nil, + expectedMigrationIDs: nil, + expectedLastID: 0, + }, + { + desc: "enabled migration followed by disabled ones", + migrations: []Migration{migrationFn(1), {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, migrationFn(3)}, + startingMigration: &Migration{ID: 0}, + // We have disabled migration, so the migrationManager should reset its state. + // Thus, we should not see any state by the repo's relative path. + expectedState: nil, expectedMigrationIDs: map[uint64]struct{}{1: {}}, expectedLastID: 1, }, @@ -482,6 +497,227 @@ func TestMigrationManager_Context(t *testing.T) { require.True(t, called) } +func TestMigrationManager_Disable_Then_Enable_Migrations(t *testing.T) { + // The test cases in this test are written in sequence on purpose to mimic the situation when + // transaction migration tasks are enabled in sequence. + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + disabledFn := func(context.Context) bool { return true } + enabledFn := func(context.Context) bool { return false } + md := metadata.MD{"foo": []string{"bar"}} + + recordingFn := func(id uint64) func(context.Context, storage.Transaction, string, string) error { + return func(ctx context.Context, txn storage.Transaction, _ string, _ string) error { + // Ensure that the context is carrying over metadata + // from the request's context. + actualMD, _ := metadata.FromIncomingContext(ctx) + assert.Equal(t, md, actualMD) + + return txn.KV().Set(uint64ToBytes(id), nil) + } + } + + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + relativePath := repo.GetRelativePath() + + testPartitionID := storage.PartitionID(1) + logger := testhelper.NewLogger(t) + + storageName := cfg.Storages[0].Name + storagePath := cfg.Storages[0].Path + + dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) + require.NoError(t, err) + defer dbMgr.Close() + + database, err := dbMgr.GetDB(storageName) + require.NoError(t, err) + defer testhelper.MustClose(t, database) + + stateDir := filepath.Join(storagePath, "state") + require.NoError(t, os.MkdirAll(stateDir, mode.Directory)) + + stagingDir := filepath.Join(storagePath, "staging") + require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) + + cmdFactory := gittest.NewCommandFactory(t, cfg) + cache := catfile.NewCache(cfg) + defer cache.Stop() + + repositoryFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, cache) + + m := partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)) + raftNode, err := raftmgr.NewNode(cfg, logger, dbMgr, nil) + require.NoError(t, err) + + raftFactory := raftmgr.DefaultFactoryWithNode(cfg.Raft, raftNode) + + partitionFactoryOptions := []partition.FactoryOption{ + partition.WithCmdFactory(cmdFactory), + partition.WithRepoFactory(repositoryFactory), + partition.WithMetrics(m), + partition.WithRaftConfig(cfg.Raft), + partition.WithRaftFactory(raftFactory), + } + factory := partition.NewFactory(partitionFactoryOptions...) + tm := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir) + + ctx, cancel := context.WithCancel(ctx) + + // loadedMigrations will be point to different migration task group to mimic the task enabling in sequence + var loadedMigrations []Migration + mm := migrationManager{ + ctx: ctx, + cancelFn: cancel, + Partition: tm, + logger: logger, + migrations: &loadedMigrations, + metrics: NewMetrics(), + migrationStates: map[string]*migrationState{}, + } + + managerErr := make(chan error) + go func() { + managerErr <- tm.Run() + }() + + t.Cleanup(func() { + tm.Close() + require.NoError(t, tm.CloseSnapshots()) + require.NoError(t, <-managerErr) + }) + + for _, seqTestCases := range []struct { + desc string + migrations []Migration + expectedState *migrationState + expectedMigrationIDs map[uint64]struct{} + expectedErr error + expectedLastID uint64 + }{ + { + desc: "step 1 with four disabled migrations", + migrations: []Migration{ + {ID: 1, IsDisabled: disabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: disabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: disabledFn, Fn: recordingFn(4)}, + }, + expectedState: nil, + expectedMigrationIDs: nil, + expectedLastID: 0, + }, + { + desc: "step 2 with the first migration enabled", + migrations: []Migration{ + {ID: 1, IsDisabled: enabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: disabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: disabledFn, Fn: recordingFn(4)}, + }, + expectedState: nil, + expectedMigrationIDs: map[uint64]struct{}{1: {}}, + expectedLastID: 1, // first migration is applied, but second is not + }, + { + desc: "step 3 with the last migration enabled", + migrations: []Migration{ + {ID: 1, IsDisabled: enabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: disabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: disabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: enabledFn, Fn: recordingFn(4)}, + }, + expectedState: nil, + expectedMigrationIDs: map[uint64]struct{}{1: {}}, + expectedLastID: 1, // first migration is applied, but second is not + }, + { + desc: "step 4 with the second migration enabled", + migrations: []Migration{ + {ID: 1, IsDisabled: enabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: enabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: disabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: enabledFn, Fn: recordingFn(4)}, + }, + expectedState: nil, + expectedMigrationIDs: map[uint64]struct{}{1: {}, 2: {}}, + expectedLastID: 2, + }, + { + desc: "step 5 with the all migration enabled", + migrations: []Migration{ + {ID: 1, IsDisabled: enabledFn, Fn: recordingFn(1)}, + {ID: 2, IsDisabled: enabledFn, Fn: recordingFn(2)}, + {ID: 3, IsDisabled: enabledFn, Fn: recordingFn(3)}, + {ID: 4, IsDisabled: enabledFn, Fn: recordingFn(4)}, + }, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{1: {}, 2: {}, 3: {}, 4: {}}, + expectedLastID: 4, + }, + } { + t.Run(seqTestCases.desc, func(t *testing.T) { + loadedMigrations = seqTestCases.migrations + txn, err := tm.Begin(ctx, storage.BeginOptions{ + Write: true, + RelativePaths: []string{relativePath}, + }) + require.NoError(t, err) + + _, err = txn.Commit(ctx) + require.NoError(t, err) + + ctx = metadata.NewIncomingContext(ctx, md) + + // Begin and commit transaction through the migration manager to exercise the migration logic. + if txn, err := mm.Begin(ctx, storage.BeginOptions{ + Write: false, + RelativePaths: []string{relativePath}, + }); err != nil { + require.ErrorContains(t, err, seqTestCases.expectedErr.Error()) + } else { + require.NoError(t, err) + + // In this test, each executed migration records its ID in the KV store. Validate + // that the expected migrations were performed. + for _, m := range seqTestCases.migrations { + _, expected := seqTestCases.expectedMigrationIDs[m.ID] + if _, err := txn.KV().Get(uint64ToBytes(m.ID)); err != nil { + require.ErrorIs(t, err, badger.ErrKeyNotFound) + require.False(t, expected) + } else { + require.NoError(t, err) + require.True(t, expected) + } + } + + _, err := txn.Commit(ctx) + require.NoError(t, err) + } + + if state, ok := mm.migrationStates[relativePath]; ok { + require.NotNil(t, seqTestCases.expectedState) + if seqTestCases.expectedState.err != nil { + require.ErrorContains(t, state.err, seqTestCases.expectedState.err.Error()) + } else { + require.NoError(t, state.err) + } + } else { + require.Nil(t, seqTestCases.expectedState) + require.Empty(t, mm.migrationStates) + } + + id, err := mm.getLastMigrationID(ctx, repo.GetRelativePath()) + require.NoError(t, err) + require.Equal(t, seqTestCases.expectedLastID, id) + }) + } +} + type mockPartition struct { storagemgr.Partition beginFn func(context.Context, storage.BeginOptions) (storage.Transaction, error)