From 0265f9f95f0e151e54fcae934d81df18159ee032 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 2 Jan 2025 13:07:32 +0700 Subject: [PATCH 1/4] storage: Unify TransationManager creation to via partition.Factory Recently, there are two ways to create an instance of the `TransactionManager`: use `partition.Factory` or create it directly. The former way is used in server setup, while the second one is used in the test only. This derivation adds a subtle but dangerous difference between the test setup and real production code, especially when we would like to add some more logic to the factory in upcoming commits. It also adds some verbosity to the test setup. This commit unifies the TransactionManager setup. `partition.Factory` is used universally in all occurrences in the test and the server setup. --- .../partition/migration/manager_test.go | 25 +++---------------- .../storagemgr/partition/testhelper_test.go | 13 +++++----- .../partition/transaction_manager_test.go | 17 +++++++------ 3 files changed, 19 insertions(+), 36 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go index c783bd0b353..29944a1357a 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go @@ -20,7 +20,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" ) @@ -156,28 +155,12 @@ func TestMigrationManager_Begin(t *testing.T) { cache := catfile.NewCache(cfg) defer cache.Stop() - repositoryFactory, err := localrepo.NewFactory( - logger, config.NewLocator(cfg), cmdFactory, cache, - ).ScopeByStorage(ctx, cfg.Storages[0].Name) - require.NoError(t, err) + repositoryFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, cache) - m := partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)).Scope(storageName) - - logManager := log.NewManager(storageName, testPartitionID, stagingDir, stateDir, nil) - tm := partition.NewTransactionManager( - testPartitionID, - logger, - database, - storageName, - storagePath, - stateDir, - stagingDir, - cmdFactory, - repositoryFactory, - m, - logManager, - ) + m := partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)) + factory := partition.NewFactory(cmdFactory, repositoryFactory, m, nil) + tm := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir) mm := migrationManager{ Partition: tm, logger: logger, diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 28ae099b883..fa1b37b87ff 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -36,7 +36,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/protobuf/proto" @@ -1043,16 +1042,17 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas stagingDir := filepath.Join(storagePath, "staging") require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) - newMetrics := func() ManagerMetrics { - return NewMetrics(housekeeping.NewMetrics(setup.Config.Prometheus)).Scope(storageName) + newMetrics := func() Metrics { + return NewMetrics(housekeeping.NewMetrics(setup.Config.Prometheus)) } var ( // managerRunning tracks whether the manager is running or closed. managerRunning bool + // factory is the factory that produces the current TransactionManager + factory = NewFactory(setup.CommandFactory, setup.RepositoryFactory, newMetrics(), setup.Consumer) // transactionManager is the current TransactionManager instance. - logManager = log.NewManager(storageName, setup.PartitionID, stagingDir, stateDir, setup.Consumer) - transactionManager = NewTransactionManager(setup.PartitionID, logger, database, storageName, storagePath, stateDir, stagingDir, setup.CommandFactory, storageScopedFactory, newMetrics(), logManager) + transactionManager = factory.New(logger, setup.PartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) // managerErr is used for synchronizing manager closing and returning // the error from Run. managerErr chan error @@ -1101,8 +1101,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.NoError(t, os.RemoveAll(stagingDir)) require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) - logManager := log.NewManager(storageName, setup.PartitionID, stagingDir, stateDir, setup.Consumer) - transactionManager = NewTransactionManager(setup.PartitionID, logger, database, setup.Config.Storages[0].Name, storagePath, stateDir, stagingDir, setup.CommandFactory, storageScopedFactory, newMetrics(), logManager) + transactionManager = factory.New(logger, setup.PartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) installHooks(transactionManager, &inflightTransactions, step.Hooks) go func() { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index 45daec4e614..d0862b93818 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -2381,10 +2381,7 @@ func BenchmarkTransactionManager(b *testing.B) { managers []*TransactionManager ) - repositoryFactory, err := localrepo.NewFactory( - logger, config.NewLocator(cfg), cmdFactory, cache, - ).ScopeByStorage(ctx, cfg.Storages[0].Name) - require.NoError(b, err) + repositoryFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, cache) // transactionWG tracks the number of on going transaction. var transactionWG sync.WaitGroup @@ -2405,13 +2402,14 @@ func BenchmarkTransactionManager(b *testing.B) { stagingDir := filepath.Join(storagePath, "staging", strconv.Itoa(i)) require.NoError(b, os.MkdirAll(stagingDir, mode.Directory)) - m := NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)).Scope(storageName) + m := NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)) // Valid partition IDs are >=1. testPartitionID := storage.PartitionID(i + 1) - logManager := log.NewManager(storageName, testPartitionID, stagingDir, stateDir, nil) - manager := NewTransactionManager(testPartitionID, logger, database, storageName, storagePath, stateDir, stagingDir, cmdFactory, repositoryFactory, m, logManager) + factory := NewFactory(cmdFactory, repositoryFactory, m, nil) + // transactionManager is the current TransactionManager instance. + manager := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) managers = append(managers, manager) @@ -2421,7 +2419,10 @@ func BenchmarkTransactionManager(b *testing.B) { assert.NoError(b, manager.Run()) }() - objectHash, err := repositoryFactory.Build(repo.GetRelativePath()).ObjectHash(ctx) + scopedRepositoryFactory, err := repositoryFactory.ScopeByStorage(ctx, cfg.Storages[0].Name) + require.NoError(b, err) + + objectHash, err := scopedRepositoryFactory.Build(repo.GetRelativePath()).ObjectHash(ctx) require.NoError(b, err) for updaterID := 0; updaterID < tc.concurrentUpdaters; updaterID++ { -- GitLab From 85d70d883c7aa329c9d5e5bf608d68fa79e17cca Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 3 Jan 2025 16:04:59 +0700 Subject: [PATCH 2/4] storage: Generalize position tracking in LogManager This commit refactors `LogManager` to decouple position tracking from consumer acknowledgment, addressing the limitations in the existing interface. A dedicated `PositionTracker` is introduced to manage tracking positions universally, enabling future extensibility for additional consumers. The changes simplify `LogManager` by removing awareness of specific position types and standardizing the acknowledgment interface. This commit still keeps the Acknowledge*Position public methods. They will be removed in some next commits. --- internal/backup/log_entry_test.go | 5 +- internal/gitaly/storage/storage.go | 7 + .../storage/storagemgr/partition/factory.go | 9 +- .../storagemgr/partition/log/log_manager.go | 97 +++++-------- .../partition/log/log_manager_test.go | 102 +++++++++++--- .../storagemgr/partition/log/positions.go | 82 +++++++++++ .../partition/log/positions_test.go | 133 ++++++++++++++++++ .../partition/transaction_manager_test.go | 6 +- 8 files changed, 360 insertions(+), 81 deletions(-) create mode 100644 internal/gitaly/storage/storagemgr/partition/log/positions.go create mode 100644 internal/gitaly/storage/storagemgr/partition/log/positions_test.go diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index 70b2113e1b0..2404ba3d45d 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -607,7 +607,10 @@ func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { PartitionID: storage.PartitionID(i), } - logManager := log.NewManager(storageName, storage.PartitionID(i), testhelper.TempDir(t), testhelper.TempDir(t), archiver) + tracker := log.NewPositionTracker() + require.NoError(t, tracker.Register(log.ConsumerPosition)) + + logManager := log.NewManager(storageName, storage.PartitionID(i), testhelper.TempDir(t), testhelper.TempDir(t), archiver, tracker) require.NoError(t, logManager.Initialize(ctx, 0)) accessor.Lock() diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 4afbccf00a4..004dca43cfb 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -203,6 +203,13 @@ type LogManager interface { GetNotificationQueue() <-chan error } +// PositionType implements storage.LogPositionType. It's a specific type of position to be tracked in the +// Write-Ahead Log (WAL) tracking system. It defines whether changes to this position type should trigger notifications. +type PositionType struct { + Name string + ShouldNotify bool +} + // Partition is responsible for a single partition of data. type Partition interface { // Begin begins a transaction against the partition. diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index 94aa40513d2..82a92808977 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -43,7 +43,14 @@ func (f Factory) New( panic(fmt.Errorf("building a partition for a non-existent storage: %q", storageName)) } - logManager := log.NewManager(storageName, partitionID, stagingDir, absoluteStateDir, f.logConsumer) + positionTracker := log.NewPositionTracker() + if f.logConsumer != nil { + if err := positionTracker.Register(log.ConsumerPosition); err != nil { + panic(err) + } + } + + logManager := log.NewManager(storageName, partitionID, stagingDir, absoluteStateDir, f.logConsumer, positionTracker) return NewTransactionManager( partitionID, logger, diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index f44046aad14..ee2e2e8188a 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -9,7 +9,6 @@ import ( "path/filepath" "runtime/trace" "sync" - "sync/atomic" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" @@ -26,34 +25,6 @@ func EntryPath(stateDir string, lsn storage.LSN) string { return filepath.Join(StatePath(stateDir), lsn.String()) } -type positionType int - -const ( - // appliedPosition keeps track of latest applied position. WAL could not prune a log entry if it has not been applied. - appliedPosition positionType = iota + 1 - // consumerPosition keeps track of the latest consumer acknowledgement. - consumerPosition -) - -// position tracks the last LSN acknowledged for of a particular type. -type position struct { - lsn atomic.Value -} - -func newPosition() *position { - p := position{} - p.setPosition(0) - return &p -} - -func (p *position) getPosition() storage.LSN { - return p.lsn.Load().(storage.LSN) -} - -func (p *position) setPosition(pos storage.LSN) { - p.lsn.Store(pos) -} - // Manager is responsible for managing the Write-Ahead Log (WAL) entries on disk. It maintains the in-memory state // and indexing system that reflect the functional state of the WAL. The Manager ensures safe and consistent // proposals, applications, and prunings of log entries, acting as the interface for transactional log operations. It @@ -100,31 +71,31 @@ type Manager struct { // consumer is an external caller that may perform read-only operations against applied log entries. Log entries // are retained until the consumer has acknowledged past their LSN. consumer storage.LogConsumer - // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL log - // entries are only pruned when they are not used anymore. - positions map[positionType]*position + // positionTracker tracks positionTracker of log entries being used externally. Those positionTracker are + // tracked so that WAL log entries are only pruned when they are not used anymore. + positionTracker *PositionTracker // notifyQueue is a queue notifying when there is a new change or there's something wrong with the log manager. notifyQueue chan error } // NewManager returns an instance of Manager. -func NewManager(storageName string, partitionID storage.PartitionID, stagingDirectory string, stateDirectory string, consumer storage.LogConsumer) *Manager { - positions := map[positionType]*position{ - appliedPosition: newPosition(), - } - if consumer != nil { - positions[consumerPosition] = newPosition() - } - +func NewManager( + storageName string, + partitionID storage.PartitionID, + stagingDirectory string, + stateDirectory string, + consumer storage.LogConsumer, + positionTracker *PositionTracker, +) *Manager { return &Manager{ - storageName: storageName, - partitionID: partitionID, - tmpDirectory: stagingDirectory, - stateDirectory: stateDirectory, - consumer: consumer, - positions: positions, - notifyQueue: make(chan error, 1), + storageName: storageName, + partitionID: partitionID, + tmpDirectory: stagingDirectory, + stateDirectory: stateDirectory, + consumer: consumer, + positionTracker: positionTracker, + notifyQueue: make(chan error, 1), } } @@ -187,24 +158,30 @@ func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) erro // AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. func (mgr *Manager) AcknowledgeAppliedPosition(lsn storage.LSN) { - mgr.positions[appliedPosition].setPosition(lsn) - mgr.pruneLogEntries() + _ = mgr.AcknowledgePosition(AppliedPosition, lsn) } // AcknowledgeConsumerPosition acknowledges log entries up and including LSN as successfully processed for the specified // LogConsumer. The manager is awakened if it is currently awaiting a new or completed transaction. func (mgr *Manager) AcknowledgeConsumerPosition(lsn storage.LSN) { - if mgr.consumer == nil { - panic("log manager's consumer must be present prior to AcknowledgeConsumerPos call") + _ = mgr.AcknowledgePosition(ConsumerPosition, lsn) +} + +// AcknowledgePosition acknowledges the position of a position type. +func (mgr *Manager) AcknowledgePosition(t storage.PositionType, lsn storage.LSN) error { + if err := mgr.positionTracker.Set(t.Name, lsn); err != nil { + return fmt.Errorf("acknowledge position: %w", err) } - mgr.positions[consumerPosition].setPosition(lsn) - mgr.pruneLogEntries() // Alert the outsider. If it has a pending acknowledgement already no action is required. - select { - case mgr.notifyQueue <- nil: - default: + if t.ShouldNotify { + select { + case mgr.notifyQueue <- nil: + default: + } } + mgr.pruneLogEntries() + return nil } // GetNotificationQueue returns a notify channel so that caller can poll new changes. @@ -420,11 +397,11 @@ func (mgr *Manager) lowWaterMark() storage.LSN { // Position is the last acknowledged LSN, this is eligible for pruning. // lowWaterMark returns the lowest LSN that cannot be pruned, so add one. - for _, pos := range mgr.positions { - if pos.getPosition()+1 < minAcknowledged { - minAcknowledged = pos.getPosition() + 1 + mgr.positionTracker.Each(func(_ string, p storage.LSN) { + if p+1 < minAcknowledged { + minAcknowledged = p + 1 } - } + }) return minAcknowledged } diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 5df69f2deaa..2e8dfd3ecaa 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -29,8 +29,16 @@ func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) sto return nextLSN } +func newTracker(t *testing.T, consumer storage.LogConsumer) *PositionTracker { + tracker := NewPositionTracker() + if consumer != nil { + require.NoError(t, tracker.Register(ConsumerPosition)) + } + return tracker +} + func setupLogManager(t *testing.T, ctx context.Context, consumer storage.LogConsumer) *Manager { - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer, newTracker(t, consumer)) require.NoError(t, logManager.Initialize(ctx, 0)) return logManager @@ -56,7 +64,7 @@ func TestLogManager_Initialize(t *testing.T) { ctx := testhelper.Context(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) waitUntilPruningFinish(t, logManager) @@ -78,13 +86,13 @@ func TestLogManager_Initialize(t *testing.T) { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) - logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) waitUntilPruningFinish(t, logManager) @@ -105,11 +113,12 @@ func TestLogManager_Initialize(t *testing.T) { t.Run("existing WAL entries with appliedLSN in-between", func(t *testing.T) { t.Parallel() + ctx := testhelper.Context(t) stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { @@ -117,7 +126,7 @@ func TestLogManager_Initialize(t *testing.T) { } require.NoError(t, logManager.Close()) - logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 2)) waitUntilPruningFinish(t, logManager) @@ -140,7 +149,7 @@ func TestLogManager_Initialize(t *testing.T) { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { @@ -148,7 +157,7 @@ func TestLogManager_Initialize(t *testing.T) { } require.NoError(t, logManager.Close()) - logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 3)) waitUntilPruningFinish(t, logManager) @@ -169,7 +178,7 @@ func TestLogManager_Initialize(t *testing.T) { ctx := testhelper.Context(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) // Attempt to initialize again @@ -186,7 +195,7 @@ func TestLogManager_Initialize(t *testing.T) { cancel() // Cancel the context before initializing stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) err := logManager.Initialize(ctx, 0) require.Error(t, err) @@ -198,7 +207,7 @@ func TestLogManager_Initialize(t *testing.T) { ctx, cancel := context.WithCancel(testhelper.Context(t)) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) // Cancel the context after initialization @@ -349,7 +358,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 5; i++ { @@ -386,7 +395,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { }) // Restart the manager - logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 5)) waitUntilPruningFinish(t, logManager) @@ -402,7 +411,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) var wg sync.WaitGroup @@ -557,7 +566,7 @@ func TestLogManager_Positions(t *testing.T) { // Before restart mockConsumer := &mockLogConsumer{} - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer, newTracker(t, mockConsumer)) require.NoError(t, logManager.Initialize(ctx, 0)) appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) @@ -585,7 +594,7 @@ func TestLogManager_Positions(t *testing.T) { // Restart the log consumer. mockConsumer = &mockLogConsumer{} - logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 2)) // Notify consumer to consume from 2 -> 4 @@ -741,6 +750,63 @@ func TestLogManager_Positions(t *testing.T) { "/wal": {Mode: mode.Directory}, }) }) + + t.Run("more position types apart from defaults are supported", func(t *testing.T) { + consumer := &mockLogConsumer{} + + tracker := newTracker(t, consumer) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer, tracker) + + t1 := storage.PositionType{Name: "TestPosition1", ShouldNotify: false} + t2 := storage.PositionType{Name: "TestPosition2", ShouldNotify: false} + require.NoError(t, tracker.Register(t1)) + require.NoError(t, tracker.Register(t2)) + + require.NoError(t, logManager.Initialize(ctx, 0)) + + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) + + // Consumed = 3, Applied = 2, TestPosition1 = 1, testPosition2 = 1 + simulatePositions(t, logManager, 3, 2) + + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + + // Consumed = 3, Applied = 3, TestPosition1 = 2, testPosition2 = 2 + require.NoError(t, logManager.AcknowledgePosition(t1, 2)) + require.NoError(t, logManager.AcknowledgePosition(t2, 2)) + simulatePositions(t, logManager, 3, 3) + + require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + + // All positions are 3 + require.NoError(t, logManager.AcknowledgePosition(t1, 3)) + require.NoError(t, logManager.AcknowledgePosition(t2, 3)) + simulatePositions(t, logManager, 3, 3) + + require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) } func TestLogManager_Close(t *testing.T) { @@ -748,7 +814,7 @@ func TestLogManager_Close(t *testing.T) { t.Run("close uninitialized manager", func(t *testing.T) { t.Parallel() - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil, newTracker(t, nil)) // Attempt to close the manager before initialization err := logManager.Close() @@ -759,7 +825,7 @@ func TestLogManager_Close(t *testing.T) { t.Run("close after initialization", func(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil, newTracker(t, nil)) // Properly initialize the manager require.NoError(t, logManager.Initialize(ctx, 0)) diff --git a/internal/gitaly/storage/storagemgr/partition/log/positions.go b/internal/gitaly/storage/storagemgr/partition/log/positions.go new file mode 100644 index 00000000000..842f80188d0 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log/positions.go @@ -0,0 +1,82 @@ +// positions.go +package log + +import ( + "fmt" + "sync/atomic" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" +) + +var ( + // AppliedPosition keeps track of the latest applied position. WAL cannot prune a log entry if it has not been applied. + AppliedPosition = storage.PositionType{Name: "AppliedPosition", ShouldNotify: false} + // ConsumerPosition keeps track of the latest consumer acknowledgment. + ConsumerPosition = storage.PositionType{Name: "ConsumerPosition", ShouldNotify: true} +) + +// position tracks the last LSN acknowledged for a particular type. +type position struct { + atomic.Value +} + +func newPosition() *position { + p := position{} + p.setPosition(0) + return &p +} + +func (p *position) getPosition() storage.LSN { + return p.Load().(storage.LSN) +} + +func (p *position) setPosition(pos storage.LSN) { + p.Store(pos) +} + +// PositionTracker manages positions for various position types. +type PositionTracker struct { + positions map[string]*position +} + +// NewPositionTracker creates and initializes a new PositionTracker. +func NewPositionTracker() *PositionTracker { + return &PositionTracker{ + positions: map[string]*position{ + AppliedPosition.Name: newPosition(), + }, + } +} + +// Register adds a new position type to the tracker. +func (p *PositionTracker) Register(t storage.PositionType) error { + if _, exist := p.positions[t.Name]; exist { + return fmt.Errorf("position type %q already registered", t.Name) + } + p.positions[t.Name] = newPosition() + return nil +} + +// Set updates the position for a given type. +func (p *PositionTracker) Set(t string, lsn storage.LSN) error { + if _, exist := p.positions[t]; !exist { + return fmt.Errorf("acknowledged an unregistered position type %q", t) + } + p.positions[t].setPosition(lsn) + return nil +} + +// Get retrieves the position for a given type. +func (p *PositionTracker) Get(t string) (storage.LSN, error) { + if _, exist := p.positions[t]; !exist { + return 0, fmt.Errorf("acknowledged an unregistered position type %q", t) + } + return p.positions[t].getPosition(), nil +} + +// Each iterates through the list of tracked positions and yields the callback with corresponding LSN. +func (p *PositionTracker) Each(callback func(string, storage.LSN)) { + for t, pos := range p.positions { + callback(t, pos.getPosition()) + } +} diff --git a/internal/gitaly/storage/storagemgr/partition/log/positions_test.go b/internal/gitaly/storage/storagemgr/partition/log/positions_test.go new file mode 100644 index 00000000000..e26457779af --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log/positions_test.go @@ -0,0 +1,133 @@ +package log + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" +) + +func TestPositionTracker(t *testing.T) { + t.Parallel() + + t.Run("Set and Get AppliedPosition and ConsumerPosition", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + require.NoError(t, tracker.Set(AppliedPosition.Name, storage.LSN(5))) + + pos, err := tracker.Get(AppliedPosition.Name) + require.NoError(t, err) + require.Equal(t, storage.LSN(5), pos) + + require.NoError(t, tracker.Register(ConsumerPosition)) + require.NoError(t, tracker.Set(ConsumerPosition.Name, storage.LSN(10))) + + pos, err = tracker.Get(ConsumerPosition.Name) + require.NoError(t, err) + require.Equal(t, storage.LSN(10), pos) + }) + + t.Run("Set and Get single position multiple times", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + testPosition := storage.PositionType{Name: "TestPosition", ShouldNotify: false} + require.NoError(t, tracker.Register(testPosition)) + + for i := 1; i <= 3; i++ { + require.NoError(t, tracker.Set(testPosition.Name, storage.LSN(i))) + pos, err := tracker.Get(testPosition.Name) + require.NoError(t, err) + require.Equal(t, storage.LSN(i), pos) + } + }) + + t.Run("Set and Get multiple positions", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + positions := []storage.PositionType{ + {Name: "Position1", ShouldNotify: false}, + {Name: "Position2", ShouldNotify: true}, + } + values := []storage.LSN{5, 10} + + for i, posType := range positions { + require.NoError(t, tracker.Register(posType)) + require.NoError(t, tracker.Set(posType.Name, values[i])) + } + + for i, posType := range positions { + pos, err := tracker.Get(posType.Name) + require.NoError(t, err) + require.Equal(t, values[i], pos) + } + }) + + t.Run("Double register position type", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + testPosition := storage.PositionType{Name: "TestPosition", ShouldNotify: false} + require.NoError(t, tracker.Register(testPosition)) + err := tracker.Register(testPosition) + require.EqualError(t, err, "position type \"TestPosition\" already registered") + }) + + t.Run("Duplicated name register", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + testPosition1 := storage.PositionType{Name: "TestPosition", ShouldNotify: false} + testPosition2 := storage.PositionType{Name: "TestPosition", ShouldNotify: true} + + require.NoError(t, tracker.Register(testPosition1)) + err := tracker.Register(testPosition2) + require.EqualError(t, err, "position type \"TestPosition\" already registered") + }) + + t.Run("Ack unregistered position", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + posType := storage.PositionType{Name: "Unregistered", ShouldNotify: false} + + err := tracker.Set(posType.Name, storage.LSN(1)) + require.EqualError(t, err, "acknowledged an unregistered position type \"Unregistered\"") + }) + + t.Run("Get unregistered position", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + posType := storage.PositionType{Name: "Unregistered", ShouldNotify: false} + + _, err := tracker.Get(posType.Name) + require.EqualError(t, err, "acknowledged an unregistered position type \"Unregistered\"") + }) + + t.Run("Range over positions", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + positions := []storage.PositionType{ + {Name: "Position1", ShouldNotify: false}, + {Name: "Position2", ShouldNotify: true}, + } + values := []storage.LSN{5, 10} + + for i, posType := range positions { + require.NoError(t, tracker.Register(posType)) + require.NoError(t, tracker.Set(posType.Name, values[i])) + } + + trackedPositions := map[string]storage.LSN{} + tracker.Each(func(name string, lsn storage.LSN) { + trackedPositions[name] = lsn + }) + + require.Equal(t, storage.LSN(5), trackedPositions["Position1"]) + require.Equal(t, storage.LSN(10), trackedPositions["Position2"]) + }) +} diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index d0862b93818..ccf028305a9 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -2257,7 +2257,11 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t require.NoError(t, err) require.NoError(t, os.WriteFile(manifestPath(logEntryPath), manifestBytes, mode.File)) - logManager := log.NewManager(tm.storageName, setup.PartitionID, testhelper.TempDir(t), filepath.Join(tm.storagePath, "state"), setup.Consumer) + tracker := log.NewPositionTracker() + if setup.Consumer != nil { + require.NoError(t, tracker.Register(log.ConsumerPosition)) + } + logManager := log.NewManager(tm.storageName, setup.PartitionID, testhelper.TempDir(t), filepath.Join(tm.storagePath, "state"), setup.Consumer, tracker) require.NoError(t, logManager.Initialize(ctx, 3)) lsn, err := logManager.AppendLogEntry(logEntryPath) require.NoError(t, err) -- GitLab From d69efa62fa5d485881ca6ee865e6ec48234a245a Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 6 Jan 2025 10:20:29 +0700 Subject: [PATCH 3/4] storage: Move applied position acknowledgement out of log.Manager Recently, log.Manager sets the applied position and consumed position after initialization. This task should belong to the caller. log.Manager should initialize the LSNs to the safest position, which is the earliest log entry existent on disk. Callers have the freedom to update the positions respectively. --- .../storage/storagemgr/partition/log/log_manager.go | 10 ++++++---- .../storagemgr/partition/log/log_manager_test.go | 11 +++++++++-- .../storagemgr/partition/transaction_manager.go | 1 + .../partition/transaction_manager_consumer_test.go | 4 ---- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index ee2e2e8188a..c0964210b8a 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -145,13 +145,15 @@ func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) erro } } + mgr.positionTracker.Each(func(t string, _ storage.LSN) { + // Set acknowledged position to oldestLSN - 1. If set the position to 0, the consumer is unable to read + // pruned entry anyway. + _ = mgr.positionTracker.Set(t, mgr.oldestLSN-1) + }) + if mgr.consumer != nil && mgr.appendedLSN != 0 { - // Set acknowledged position to oldestLSN - 1 and notify the consumer from oldestLSN -> appendedLSN. - // If set the position to 0, the consumer is unable to read pruned entry anyway. - mgr.AcknowledgeConsumerPosition(mgr.oldestLSN - 1) mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) } - mgr.AcknowledgeAppliedPosition(appliedLSN) return nil } diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 2e8dfd3ecaa..8cc13398b0b 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -129,6 +129,8 @@ func TestLogManager_Initialize(t *testing.T) { logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 2)) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 2)) + waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(3), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) @@ -159,6 +161,7 @@ func TestLogManager_Initialize(t *testing.T) { logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 3)) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 3)) waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(4), logManager.oldestLSN) @@ -277,12 +280,15 @@ func TestLogManager_PruneLogEntries(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } + // Set the applied LSN to 2 + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 2)) + // Manually set the consumer's position to the first entry, forcing low-water mark to retain it + require.NoError(t, logManager.AcknowledgePosition(ConsumerPosition, 1)) + // Before removal assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, - "/wal/0000000000001": {Mode: mode.Directory}, - "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, "/wal/0000000000002": {Mode: mode.Directory}, "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, "/wal/0000000000003": {Mode: mode.Directory}, @@ -397,6 +403,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { // Restart the manager logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 5)) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 5)) waitUntilPruningFinish(t, logManager) testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 05a41952df1..5d2ec476485 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -2355,6 +2355,7 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { if err := mgr.logManager.Initialize(ctx, mgr.appliedLSN); err != nil { return fmt.Errorf("initialize log management: %w", err) } + mgr.logManager.AcknowledgeAppliedPosition(mgr.appliedLSN) if err := os.Mkdir(mgr.snapshotsDir(), mode.Directory); err != nil { return fmt.Errorf("create snapshot manager directory: %w", err) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go index 37246625020..e29c0d51cf4 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go @@ -379,10 +379,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti }, CloseManager{}, StartManager{}, - AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - // Wait until the first acknowledgement after restart - <-tm.logManager.GetNotificationQueue() - }), }, expectedState: StateAssertion{ Database: DatabaseState{ -- GitLab From beba0d3231a5208e08cd56fbbeda1fa207d05700 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 6 Jan 2025 10:20:37 +0700 Subject: [PATCH 4/4] storage: Unify Acknowledge*Position interfaces In prior commits, the new unified AcknowledgePosition interface was introduced. However, existing AcknowledgeConsumerPosition and AcknowledgeAppliedPosition were kept to avoid changes from the caller side. This commit finishes off the refatoring commit series. Aforementioned specific methods are removed in favor of the unified interface. --- internal/backup/log_entry.go | 32 +++++++++++-------- internal/backup/log_entry_test.go | 3 +- internal/gitaly/storage/storage.go | 12 ++----- .../storagemgr/partition/log/log_manager.go | 11 ------- .../partition/log/log_manager_test.go | 22 ++++++------- .../storagemgr/partition/testhelper_test.go | 3 +- .../partition/transaction_manager.go | 9 ++++-- 7 files changed, 43 insertions(+), 49 deletions(-) diff --git a/internal/backup/log_entry.go b/internal/backup/log_entry.go index d347a644888..6411501124e 100644 --- a/internal/backup/log_entry.go +++ b/internal/backup/log_entry.go @@ -13,8 +13,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/archive" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" + logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) const ( @@ -88,7 +89,7 @@ type PartitionInfo struct { // an exponential backoff. type LogEntryArchiver struct { // logger is the logger to use to write log messages. - logger log.Logger + logger logging.Logger // store is where the log archives are kept. store LogEntryStore // node is used to access the LogManagers. @@ -130,12 +131,12 @@ type LogEntryArchiver struct { } // NewLogEntryArchiver constructs a new LogEntryArchiver. -func NewLogEntryArchiver(logger log.Logger, archiveSink *Sink, workerCount uint, node *storage.Node) *LogEntryArchiver { +func NewLogEntryArchiver(logger logging.Logger, archiveSink *Sink, workerCount uint, node *storage.Node) *LogEntryArchiver { return newLogEntryArchiver(logger, archiveSink, workerCount, node, helper.NewTimerTicker) } // newLogEntryArchiver constructs a new LogEntryArchiver with a configurable ticker function. -func newLogEntryArchiver(logger log.Logger, archiveSink *Sink, workerCount uint, node *storage.Node, tickerFunc func(time.Duration) helper.Ticker) *LogEntryArchiver { +func newLogEntryArchiver(logger logging.Logger, archiveSink *Sink, workerCount uint, node *storage.Node, tickerFunc func(time.Duration) helper.Ticker) *LogEntryArchiver { if workerCount < 1 { workerCount = 1 } @@ -294,8 +295,8 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { // We have already backed up all entries sent by the LogManager, but the manager is // not aware of this. Acknowledge again with our last processed entry. if state.nextLSN > notification.highWaterMark { - if err := la.callLogReader(ctx, notification.partitionInfo, func(lm storage.LogReader) { - lm.AcknowledgeConsumerPosition(state.nextLSN - 1) + if err := la.callLogReader(ctx, notification.partitionInfo, func(lm storage.LogReader) error { + return lm.AcknowledgePosition(log.ConsumerPosition, state.nextLSN-1) }); err != nil { la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for already completed entry") } @@ -306,7 +307,7 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { // we will be unable to backup the full sequence. if state.nextLSN < notification.lowWaterMark { la.logger.WithFields( - log.Fields{ + logging.Fields{ "storage": notification.partitionInfo.StorageName, "partition_id": notification.partitionInfo.PartitionID, "expected_lsn": state.nextLSN, @@ -328,7 +329,7 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { } } -func (la *LogEntryArchiver) callLogReader(ctx context.Context, partitionInfo PartitionInfo, callback func(lm storage.LogReader)) error { +func (la *LogEntryArchiver) callLogReader(ctx context.Context, partitionInfo PartitionInfo, callback func(lm storage.LogReader) error) error { storageHandle, err := (*la.node).GetStorage(partitionInfo.StorageName) if err != nil { return fmt.Errorf("get storage: %w", err) @@ -340,7 +341,9 @@ func (la *LogEntryArchiver) callLogReader(ctx context.Context, partitionInfo Par } defer partition.Close() - callback(partition.GetLogReader()) + if err := callback(partition.GetLogReader()); err != nil { + return fmt.Errorf("acknowledge consumer position: %w", err) + } return nil } @@ -376,11 +379,11 @@ func (la *LogEntryArchiver) receiveEntry(ctx context.Context, entry *logEntry) { la.waitDur = minRetryWait } - if err := la.callLogReader(ctx, entry.partitionInfo, func(lm storage.LogReader) { - lm.AcknowledgeConsumerPosition(entry.lsn) + if err := la.callLogReader(ctx, entry.partitionInfo, func(lm storage.LogReader) error { + return lm.AcknowledgePosition(log.ConsumerPosition, entry.lsn) }); err != nil { la.logger.WithError(err).WithFields( - log.Fields{ + logging.Fields{ "storage": entry.partitionInfo.StorageName, "partition_id": entry.partitionInfo.PartitionID, "lsn": entry.lsn, @@ -398,15 +401,16 @@ func (la *LogEntryArchiver) processEntries(ctx context.Context, inCh, outCh chan // processEntry checks if an existing backup exists, and performs a backup if not present. func (la *LogEntryArchiver) processEntry(ctx context.Context, entry *logEntry) { - logger := la.logger.WithFields(log.Fields{ + logger := la.logger.WithFields(logging.Fields{ "storage": entry.partitionInfo.StorageName, "partition_id": entry.partitionInfo.PartitionID, "lsn": entry.lsn, }) var entryPath string - if err := la.callLogReader(context.Background(), entry.partitionInfo, func(lm storage.LogReader) { + if err := la.callLogReader(context.Background(), entry.partitionInfo, func(lm storage.LogReader) error { entryPath = lm.GetEntryPath(entry.lsn) + return nil }); err != nil { la.backupCounter.WithLabelValues("fail").Add(1) la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for entry path") diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index 2404ba3d45d..bef0b03ecec 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -76,7 +76,7 @@ type mockLogManager struct { storage.LogReader } -func (lm *mockLogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { +func (lm *mockLogManager) AcknowledgePosition(_ storage.PositionType, lsn storage.LSN) error { lm.Lock() defer lm.Unlock() @@ -109,6 +109,7 @@ func (lm *mockLogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { lm.finishCount-- lm.finishFunc() } + return nil } func (lm *mockLogManager) SendNotification() { diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 004dca43cfb..6581317840e 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -169,15 +169,9 @@ type LogReader interface { // GetEntryPath returns the path of the log entry's root directory. GetEntryPath(lsn LSN) string - // The following functions allows other components acknowledge their positions. The log manager uses those - // positions to prune entries. Those interfaces are not great. We have a plan to refator them in: - // https://gitlab.com/gitlab-org/gitaly/-/issues/6528 - - // AcknowledgeConsumerPosition acknowledges log entries up and including lsn as successfully processed - // for the specified LogConsumer. - AcknowledgeConsumerPosition(lsn LSN) - // AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. - AcknowledgeAppliedPosition(lsn LSN) + // AcknowledgePosition acknowledges log entries up and including lsn as successfully processed + // for the specified position type. + AcknowledgePosition(PositionType, LSN) error } // LogManager is the interface used to manage the underlying Write-Ahead Log entries. diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index c0964210b8a..d66726360dd 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -158,17 +158,6 @@ func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) erro return nil } -// AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. -func (mgr *Manager) AcknowledgeAppliedPosition(lsn storage.LSN) { - _ = mgr.AcknowledgePosition(AppliedPosition, lsn) -} - -// AcknowledgeConsumerPosition acknowledges log entries up and including LSN as successfully processed for the specified -// LogConsumer. The manager is awakened if it is currently awaiting a new or completed transaction. -func (mgr *Manager) AcknowledgeConsumerPosition(lsn storage.LSN) { - _ = mgr.AcknowledgePosition(ConsumerPosition, lsn) -} - // AcknowledgePosition acknowledges the position of a position type. func (mgr *Manager) AcknowledgePosition(t storage.PositionType, lsn storage.LSN) error { if err := mgr.positionTracker.Set(t.Name, lsn); err != nil { diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 8cc13398b0b..2324a359362 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -259,7 +259,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { }) // Set this entry as applied - logManager.AcknowledgeAppliedPosition(1) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 1)) waitUntilPruningFinish(t, logManager) // After removal @@ -296,9 +296,9 @@ func TestLogManager_PruneLogEntries(t *testing.T) { }) // Set the applied LSN to 2 - logManager.AcknowledgeAppliedPosition(2) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 2)) // Manually set the consumer's position to the first entry, forcing low-water mark to retain it - logManager.AcknowledgeConsumerPosition(1) + require.NoError(t, logManager.AcknowledgePosition(ConsumerPosition, 1)) waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(2), logManager.oldestLSN) @@ -341,7 +341,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { }) // Set the applied LSN to 3, allowing the first three entries to be pruned - logManager.AcknowledgeAppliedPosition(3) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 3)) waitUntilPruningFinish(t, logManager) // Ensure only entries starting from LSN 4 are retained @@ -382,7 +382,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { require.NoError(t, os.Chmod(infectedPath, 0o444)) // The error is notified via notification queue so that the caller can act accordingly - logManager.AcknowledgeAppliedPosition(5) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 5)) require.ErrorContains(t, <-logManager.GetNotificationQueue(), "permission denied") require.NoError(t, logManager.Close()) @@ -443,12 +443,12 @@ func TestLogManager_PruneLogEntries(t *testing.T) { if logManager.AppendedLSN() == totalLSN { return } - logManager.AcknowledgeAppliedPosition(logManager.AppendedLSN()) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, logManager.AppendedLSN())) } }() } wg.Wait() - logManager.AcknowledgeAppliedPosition(logManager.AppendedLSN()) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, logManager.AppendedLSN())) require.NoError(t, logManager.Close()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ @@ -550,8 +550,8 @@ func TestLogManager_Positions(t *testing.T) { ctx := testhelper.Context(t) simulatePositions := func(t *testing.T, logManager *Manager, consumed storage.LSN, applied storage.LSN) { - logManager.AcknowledgeConsumerPosition(consumed) - logManager.AcknowledgeAppliedPosition(applied) + require.NoError(t, logManager.AcknowledgePosition(ConsumerPosition, consumed)) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, applied)) } t.Run("consumer pos is set to 0 after initialized", func(t *testing.T) { @@ -601,7 +601,7 @@ func TestLogManager_Positions(t *testing.T) { // Restart the log consumer. mockConsumer = &mockLogConsumer{} - logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer, newTracker(t, nil)) + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer, newTracker(t, mockConsumer)) require.NoError(t, logManager.Initialize(ctx, 2)) // Notify consumer to consume from 2 -> 4 @@ -875,7 +875,7 @@ func TestLogManager_Close(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"2": []byte("content-2")}) // Trigger pruning - logManager.AcknowledgeAppliedPosition(2) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 2)) // Close the manager and ensure all tasks are completed require.NoError(t, logManager.Close()) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index fa1b37b87ff..24671dad1fc 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -36,6 +36,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/protobuf/proto" @@ -1419,7 +1420,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction := openTransactions[step.TransactionID] transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: - transactionManager.logManager.AcknowledgeConsumerPosition(step.LSN) + require.NoError(t, transactionManager.logManager.AcknowledgePosition(log.ConsumerPosition, step.LSN)) case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 5d2ec476485..4ad8485e11d 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -35,6 +35,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/fshistory" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal/reftree" @@ -2355,7 +2356,9 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { if err := mgr.logManager.Initialize(ctx, mgr.appliedLSN); err != nil { return fmt.Errorf("initialize log management: %w", err) } - mgr.logManager.AcknowledgeAppliedPosition(mgr.appliedLSN) + if err := mgr.logManager.AcknowledgePosition(log.AppliedPosition, mgr.appliedLSN); err != nil { + return fmt.Errorf("acknowledge applied LSN: %w", err) + } if err := os.Mkdir(mgr.snapshotsDir(), mode.Directory); err != nil { return fmt.Errorf("create snapshot manager directory: %w", err) @@ -3075,7 +3078,9 @@ func (mgr *TransactionManager) storeAppliedLSN(lsn storage.LSN) error { if err := mgr.setKey(keyAppliedLSN, lsn.ToProto()); err != nil { return err } - mgr.logManager.AcknowledgeAppliedPosition(lsn) + if err := mgr.logManager.AcknowledgePosition(log.AppliedPosition, lsn); err != nil { + return fmt.Errorf("acknowledge applied LSN: %w", err) + } mgr.appliedLSN = lsn return nil } -- GitLab