From ee826c7e9d2d93c2e5ce254a8ed64e5159410fd7 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 20 Dec 2024 11:54:15 +0700 Subject: [PATCH 1/3] storage: Add context management and graceful shutdown to log.Manager This commit enhances `log.Manager` by introducing centralized context management (`ctx` and `cancel`) and a `Close` method for graceful shutdown. These changes prevent misuse (e.g., double initialization), simplify the API by embedding context internally, and ensure proper resource cleanup. The improvements also prepare `log.Manager` for upcoming changes where background workers will be added to handle log entry pruning and other asynchronous tasks. --- internal/backup/log_entry_test.go | 10 +- internal/gitaly/storage/storage.go | 8 +- .../storagemgr/partition/log/log_manager.go | 56 ++++-- .../partition/log/log_manager_test.go | 168 ++++++++++++++---- .../partition/transaction_manager.go | 9 +- .../partition/transaction_manager_test.go | 13 +- 6 files changed, 202 insertions(+), 62 deletions(-) diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index 710ccad1a9c..c4a4ae2c686 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -617,7 +617,7 @@ func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { logManagers = append(logManagers, logManager) } - appendLogEntry(t, ctx, logManagers[0], map[string][]byte{ + appendLogEntry(t, logManagers[0], map[string][]byte{ "1": []byte("content-1"), "2": []byte("content-2"), "3": []byte("content-3"), @@ -625,10 +625,10 @@ func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { <-logManagers[0].GetNotificationQueue() // KV operation without any file. - appendLogEntry(t, ctx, logManagers[1], map[string][]byte{}) + appendLogEntry(t, logManagers[1], map[string][]byte{}) <-logManagers[1].GetNotificationQueue() - appendLogEntry(t, ctx, logManagers[1], map[string][]byte{ + appendLogEntry(t, logManagers[1], map[string][]byte{ "4": []byte("content-4"), "5": []byte("content-5"), "6": []byte("content-6"), @@ -680,7 +680,7 @@ func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { testhelper.RequirePromMetrics(t, archiver.backupCounter, buildMetrics(t, 3, 0)) } -func appendLogEntry(t *testing.T, ctx context.Context, manager *log.Manager, files map[string][]byte) storage.LSN { +func appendLogEntry(t *testing.T, manager *log.Manager, files map[string][]byte) storage.LSN { t.Helper() logEntryPath := testhelper.TempDir(t) @@ -689,7 +689,7 @@ func appendLogEntry(t *testing.T, ctx context.Context, manager *log.Manager, fil require.NoError(t, os.WriteFile(path, value, mode.File)) } - nextLSN, err := manager.AppendLogEntry(ctx, logEntryPath) + nextLSN, err := manager.AppendLogEntry(logEntryPath) require.NoError(t, err) return nextLSN diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 54e6f1cb02d..5593fc0610e 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -188,14 +188,18 @@ type LogManager interface { // It ensures the environment is ready, and previous states are resumed correctly. Initialize(ctx context.Context, appliedLSN LSN) error + // Close stops the log manager, cleans up resources, and stops internal workers. The caller + // is blocked until complete. + Close() error + // AppendLogEntry appends an entry to the WAL. logEntryPath specifies the directory of the log entry. It returns // the Log Sequence Number (LSN) of the appended log entry. - AppendLogEntry(ctx context.Context, logEntryPath string) (LSN, error) + AppendLogEntry(logEntryPath string) (LSN, error) // PruneLogEntries removes log entries that are no longer needed, based on retention policies. This is a leaky // abstraction. The log manager should be fully responsible for the life cycle of log entries. We'll remove // external access to log entry pruning in https://gitlab.com/gitlab-org/gitaly/-/issues/6529. - PruneLogEntries(ctx context.Context) error + PruneLogEntries() error // AppendedLSN returns the LSN of the latest appended log entry. AppendedLSN() LSN diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index 70e71981faf..d8234531dd1 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -61,6 +61,13 @@ func (p *position) setPosition(pos storage.LSN) { // references and acknowledgements. It effectively abstracts WAL operations from the TransactionManager, contributing to // a cleaner separation of concerns and making the system more maintainable and extensible. type Manager struct { + // ctx is the context that associated with the Manager instance. This is used for managing cancellations, + // deadlines, and carrying request-scoped values across log operations. + ctx context.Context + // cancel is the cancellation function for the Manager's context. It is used to explicitly cancel the context + // and signal internal workers to stop. It ensures proper cleanup when the Manager is no longer in use. + cancel context.CancelFunc + // mutex protects access to critical states, especially `oldestLSN` and `appendedLSN`, as well as the integrity // of inflight log entries. Since indices are monotonic, two parallel log appending operations result in pushing // files into the same directory and breaking the manifest file. Thus, Parallel log entry appending and pruning @@ -121,7 +128,18 @@ func NewManager(storageName string, partitionID storage.PartitionID, stagingDire // initial log entry state, enabling consumers to start processing from the correct point. Proper initialization is // crucial for maintaining data consistency and ensuring that log entries are managed accurately upon system startup. func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) error { - if err := mgr.createStateDirectory(ctx); err != nil { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + if mgr.ctx != nil { + return fmt.Errorf("log manager already initialized") + } else if ctx.Err() != nil { + return ctx.Err() + } + + mgr.ctx, mgr.cancel = context.WithCancel(ctx) + + if err := mgr.createStateDirectory(); err != nil { return fmt.Errorf("create state directory: %w", err) } @@ -184,14 +202,30 @@ func (mgr *Manager) GetNotificationQueue() <-chan struct{} { return mgr.notifyQueue } +// Close gracefully shuts down the log manager by canceling its context and signaling any associated internal workers to +// stop. The closer is blocked until all resources are released and workers (if any) have already stopped. +func (mgr *Manager) Close() error { + if mgr.cancel == nil { + return fmt.Errorf("log manager has not been initialized") + } + mgr.cancel() + return nil +} + // AppendLogEntry appends an entry to the write-ahead log. logEntryPath is an // absolute path to the directory that represents the log entry. appendLogEntry // moves the log entry's directory to the WAL, and returns its LSN once it has // been committed to the log. -func (mgr *Manager) AppendLogEntry(ctx context.Context, logEntryPath string) (storage.LSN, error) { +func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { mgr.mutex.Lock() defer mgr.mutex.Unlock() + select { + case <-mgr.ctx.Done(): + return 0, mgr.ctx.Err() + default: + } + nextLSN := mgr.appendedLSN + 1 // Move the log entry from the staging directory into its place in the log. @@ -201,7 +235,7 @@ func (mgr *Manager) AppendLogEntry(ctx context.Context, logEntryPath string) (st } // After this sync, the log entry has been persisted and will be recovered on failure. - if err := safe.NewSyncer().SyncParent(ctx, destinationPath); err != nil { + if err := safe.NewSyncer().SyncParent(mgr.ctx, destinationPath); err != nil { // If this fails, the log entry will be left in the write-ahead log but it is not // properly persisted. If the fsync fails, something is seriously wrong and there's no // point trying to delete the files. The right thing to do is to terminate Gitaly @@ -220,7 +254,7 @@ func (mgr *Manager) AppendLogEntry(ctx context.Context, logEntryPath string) (st return nextLSN, nil } -func (mgr *Manager) createStateDirectory(ctx context.Context) error { +func (mgr *Manager) createStateDirectory() error { needsFsync := false for _, path := range []string{ mgr.stateDirectory, @@ -244,11 +278,11 @@ func (mgr *Manager) createStateDirectory(ctx context.Context) error { } syncer := safe.NewSyncer() - if err := syncer.SyncRecursive(ctx, mgr.stateDirectory); err != nil { + if err := syncer.SyncRecursive(mgr.ctx, mgr.stateDirectory); err != nil { return fmt.Errorf("sync: %w", err) } - if err := syncer.SyncParent(ctx, mgr.stateDirectory); err != nil { + if err := syncer.SyncParent(mgr.ctx, mgr.stateDirectory); err != nil { return fmt.Errorf("sync parent: %w", err) } @@ -259,7 +293,7 @@ func (mgr *Manager) createStateDirectory(ctx context.Context) error { // needed. It ensures efficient storage management by removing redundant entries while maintaining the integrity of the // log sequence. The method respects the established low-water mark, ensuring no entries that might still be required // for transaction consistency are deleted. -func (mgr *Manager) PruneLogEntries(ctx context.Context) error { +func (mgr *Manager) PruneLogEntries() error { mgr.mutex.Lock() defer mgr.mutex.Unlock() @@ -278,7 +312,7 @@ func (mgr *Manager) PruneLogEntries(ctx context.Context) error { // Low-water mark // for mgr.oldestLSN < mgr.lowWaterMark() { - if err := mgr.deleteLogEntry(ctx, mgr.oldestLSN); err != nil { + if err := mgr.deleteLogEntry(mgr.oldestLSN); err != nil { return fmt.Errorf("deleting log entry: %w", err) } mgr.oldestLSN++ @@ -300,8 +334,8 @@ func (mgr *Manager) GetEntryPath(lsn storage.LSN) string { } // deleteLogEntry deletes the log entry at the given LSN from the log. -func (mgr *Manager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { - defer trace.StartRegion(ctx, "deleteLogEntry").End() +func (mgr *Manager) deleteLogEntry(lsn storage.LSN) error { + defer trace.StartRegion(mgr.ctx, "deleteLogEntry").End() tmpDir, err := os.MkdirTemp(mgr.tmpDirectory, "") if err != nil { @@ -319,7 +353,7 @@ func (mgr *Manager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { return fmt.Errorf("rename: %w", err) } - if err := safe.NewSyncer().SyncParent(ctx, logEntryPath); err != nil { + if err := safe.NewSyncer().SyncParent(mgr.ctx, logEntryPath); err != nil { return fmt.Errorf("sync file deletion: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 27ca31a57e1..6b3aef55bc3 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -14,7 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) -func appendLogEntry(t *testing.T, ctx context.Context, manager *Manager, files map[string][]byte) storage.LSN { +func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) storage.LSN { t.Helper() logEntryPath := testhelper.TempDir(t) @@ -23,7 +23,7 @@ func appendLogEntry(t *testing.T, ctx context.Context, manager *Manager, files m require.NoError(t, os.WriteFile(path, value, mode.File)) } - nextLSN, err := manager.AppendLogEntry(ctx, logEntryPath) + nextLSN, err := manager.AppendLogEntry(logEntryPath) require.NoError(t, err) return nextLSN @@ -64,8 +64,8 @@ func TestLogManager_Initialize(t *testing.T) { logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) @@ -89,10 +89,10 @@ func TestLogManager_Initialize(t *testing.T) { stateDir := testhelper.TempDir(t) logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) - require.NoError(t, logManager.createStateDirectory(ctx)) + require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) @@ -123,7 +123,7 @@ func TestLogManager_Initialize(t *testing.T) { require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) @@ -144,6 +144,48 @@ func TestLogManager_Initialize(t *testing.T) { "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, }) }) + + t.Run("double initialization error", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + // Attempt to initialize again + err := logManager.Initialize(ctx, 0) + require.Error(t, err) + require.Equal(t, "log manager already initialized", err.Error()) + }) + + t.Run("context canceled before initialization", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(testhelper.Context(t)) + cancel() // Cancel the context before initializing + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + err := logManager.Initialize(ctx, 0) + + require.Error(t, err) + require.Equal(t, context.Canceled, err) + }) + + t.Run("context canceled after initialization", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(testhelper.Context(t)) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + // Cancel the context after initialization + cancel() + + // Check if the manager's context was also canceled + require.EqualError(t, logManager.ctx.Err(), context.Canceled.Error()) + }) } func TestLogManager_PruneLogEntries(t *testing.T) { @@ -155,7 +197,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { logManager := setupLogManager(t, ctx, nil) // Expect no entries to be removed - require.NoError(t, logManager.PruneLogEntries(ctx)) + require.NoError(t, logManager.PruneLogEntries()) require.Equal(t, storage.LSN(1), logManager.oldestLSN) // Assert on-disk state @@ -171,7 +213,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { logManager := setupLogManager(t, ctx, nil) // Inject a single log entry - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) // Set this entry as applied logManager.AcknowledgeAppliedPosition(1) @@ -185,7 +227,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { }) // Attempt to remove applied log entries - require.NoError(t, logManager.PruneLogEntries(ctx)) + require.NoError(t, logManager.PruneLogEntries()) // After removal require.Equal(t, storage.LSN(2), logManager.oldestLSN) @@ -202,7 +244,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { // Inject multiple log entries for i := 0; i < 3; i++ { - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } // Set the applied LSN to 2 @@ -222,7 +264,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, }) - require.NoError(t, logManager.PruneLogEntries(ctx)) + require.NoError(t, logManager.PruneLogEntries()) require.Equal(t, storage.LSN(2), logManager.oldestLSN) // Assert on-disk state to ensure no entries were removed @@ -243,7 +285,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { // Inject multiple log entries for i := 0; i < 5; i++ { - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } // Set the applied LSN to 3, allowing the first three entries to be pruned @@ -265,7 +307,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, }) - require.NoError(t, logManager.PruneLogEntries(ctx)) + require.NoError(t, logManager.PruneLogEntries()) // Ensure only entries starting from LSN 4 are retained require.Equal(t, storage.LSN(4), logManager.oldestLSN) @@ -293,7 +335,7 @@ func TestLogManager_AppendLogEntry(t *testing.T) { require.Equal(t, logManager.appendedLSN, storage.LSN(0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) require.Equal(t, logManager.appendedLSN, storage.LSN(1)) testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ @@ -312,7 +354,7 @@ func TestLogManager_AppendLogEntry(t *testing.T) { require.Equal(t, logManager.appendedLSN, storage.LSN(0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{ + appendLogEntry(t, logManager, map[string][]byte{ "1": []byte("content-1"), "2": []byte("content-2"), "3": []byte("content-3"), @@ -337,9 +379,9 @@ func TestLogManager_AppendLogEntry(t *testing.T) { require.Equal(t, logManager.appendedLSN, storage.LSN(0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2-1"), "2": []byte("content-2-2")}) - appendLogEntry(t, ctx, logManager, nil) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2-1"), "2": []byte("content-2-2")}) + appendLogEntry(t, logManager, nil) require.Equal(t, logManager.appendedLSN, storage.LSN(3)) testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ @@ -373,7 +415,7 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions := func(t *testing.T, logManager *Manager, consumed storage.LSN, applied storage.LSN) { logManager.AcknowledgeConsumerPosition(consumed) logManager.AcknowledgeAppliedPosition(applied) - require.NoError(t, logManager.PruneLogEntries(ctx)) + require.NoError(t, logManager.PruneLogEntries()) } t.Run("consumer pos is set to 0 after initialized", func(t *testing.T) { @@ -398,8 +440,8 @@ func TestLogManager_Positions(t *testing.T) { logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) require.NoError(t, logManager.Initialize(ctx, 0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) // Apply to 3 but consume to 1 simulatePositions(t, logManager, 1, 2) @@ -407,8 +449,8 @@ func TestLogManager_Positions(t *testing.T) { require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) // Inject 3, 4 - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-4")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-4")}) testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -445,8 +487,8 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 0, 2) @@ -467,8 +509,8 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 1, 2) @@ -487,8 +529,8 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 2, 0) @@ -509,10 +551,10 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) simulatePositions(t, logManager, 1, 1) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 2, 2) // The oldest LSN changes after each acknowledgement @@ -529,13 +571,13 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) simulatePositions(t, logManager, 0, 1) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 0, 2) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) simulatePositions(t, logManager, 3, 3) require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) @@ -549,9 +591,9 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) // 2 and 3 are not applied, hence kept intact. simulatePositions(t, logManager, 3, 1) @@ -575,3 +617,53 @@ func TestLogManager_Positions(t *testing.T) { }) }) } + +func TestLogManager_Close(t *testing.T) { + t.Parallel() + + t.Run("close uninitialized manager", func(t *testing.T) { + t.Parallel() + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil) + + // Attempt to close the manager before initialization + err := logManager.Close() + require.Error(t, err) + require.Equal(t, "log manager has not been initialized", err.Error()) + }) + + t.Run("close after initialization", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil) + + // Properly initialize the manager + require.NoError(t, logManager.Initialize(ctx, 0)) + + // Close the manager + require.NoError(t, logManager.Close()) + + // Verify the context has been canceled + require.EqualError(t, logManager.ctx.Err(), context.Canceled.Error()) + }) + + t.Run("close after appending log entries", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Append some log entries + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"2": []byte("content-2")}) + + // Close the manager + require.NoError(t, logManager.Close()) + + // Verify the context has been canceled + require.EqualError(t, logManager.ctx.Err(), context.Canceled.Error()) + + // Further appending should fail due to the canceled context + _, err := logManager.AppendLogEntry(testhelper.TempDir(t)) + require.Error(t, err) + require.Equal(t, context.Canceled, err) + }) +} diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index ad56f3c5ae1..dc0f632a0ff 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -1990,6 +1990,11 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { returnedErr = errors.Join(returnedErr, fmt.Errorf("clean up worker: %w", err)) } }() + defer func() { + if err := mgr.logManager.Close(); err != nil { + returnedErr = errors.Join(returnedErr, fmt.Errorf("stopping log manager: %w", err)) + } + }() // Defer the Stop in order to release all on-going Commit calls in case of error. defer close(mgr.closed) defer mgr.Close() @@ -2008,7 +2013,7 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { continue } - if err := mgr.logManager.PruneLogEntries(mgr.ctx); err != nil { + if err := mgr.logManager.PruneLogEntries(); err != nil { return fmt.Errorf("pruning log entries: %w", err) } @@ -2985,7 +2990,7 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende // After this latch block, the transaction is committed and all subsequent transactions // are guaranteed to read it. - appendedLSN, err := mgr.logManager.AppendLogEntry(ctx, logEntryPath) + appendedLSN, err := mgr.logManager.AppendLogEntry(logEntryPath) if err != nil { mgr.mutex.Lock() delete(mgr.snapshotLocks, mgr.logManager.AppendedLSN()+1) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index e5e9cefddfc..b5d41f1db0a 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -2263,15 +2263,20 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t manifestBytes, err := proto.Marshal(expectedManifest.Content.(proto.Message)) require.NoError(t, err) require.NoError(t, os.WriteFile(manifestPath(logEntryPath), manifestBytes, mode.File)) - require.NoError(t, tm.appendLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, nil, logEntryPath)) + + logManager := log.NewManager(tm.storageName, setup.PartitionID, testhelper.TempDir(t), filepath.Join(tm.storagePath, "state"), setup.Consumer) + require.NoError(t, logManager.Initialize(ctx, 3)) + lsn, err := logManager.AppendLogEntry(logEntryPath) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(4)) RequireDatabase(t, ctx, tm.db, DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), }) // Transaction 2 and 3 are left-over. - require.NoDirExists(t, tm.logManager.GetEntryPath(2)) - require.NoDirExists(t, tm.logManager.GetEntryPath(3)) - testhelper.RequireDirectoryState(t, tm.logManager.GetEntryPath(4), "", testhelper.DirectoryState{ + require.NoDirExists(t, logManager.GetEntryPath(2)) + require.NoDirExists(t, logManager.GetEntryPath(3)) + testhelper.RequireDirectoryState(t, logManager.GetEntryPath(4), "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/MANIFEST": expectedManifest, "/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, -- GitLab From b70704fc94b4c373fba1f1b8b39f74ee7a1821f9 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 20 Dec 2024 14:41:02 +0700 Subject: [PATCH 2/3] storage: Allow log manager returns an error via the notifcation queue The log manager exposes a notififcation queue to communicate with the transaction manager. The queue uses an empty struct to notify any changes now. In some next commits, we'll need to let the transaction manager know when the log manager failed. This commit updates the signature of notifcation queue to an error channel. --- internal/backup/log_entry_test.go | 6 +++--- internal/gitaly/storage/storage.go | 2 +- .../storage/storagemgr/partition/log/log_manager.go | 10 +++++----- .../storagemgr/partition/transaction_manager.go | 5 ++++- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index c4a4ae2c686..70b2113e1b0 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -622,18 +622,18 @@ func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { "2": []byte("content-2"), "3": []byte("content-3"), }) - <-logManagers[0].GetNotificationQueue() + require.Nil(t, <-logManagers[0].GetNotificationQueue()) // KV operation without any file. appendLogEntry(t, logManagers[1], map[string][]byte{}) - <-logManagers[1].GetNotificationQueue() + require.Nil(t, <-logManagers[1].GetNotificationQueue()) appendLogEntry(t, logManagers[1], map[string][]byte{ "4": []byte("content-4"), "5": []byte("content-5"), "6": []byte("content-6"), }) - <-logManagers[1].GetNotificationQueue() + require.Nil(t, <-logManagers[1].GetNotificationQueue()) archiver.Close() diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 5593fc0610e..466c4ea70c4 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -205,7 +205,7 @@ type LogManager interface { AppendedLSN() LSN // GetNotificationQueue returns a channel that is used to notify external components of changes. - GetNotificationQueue() <-chan struct{} + GetNotificationQueue() <-chan error } // Partition is responsible for a single partition of data. diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index d8234531dd1..32e4a1b675b 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -97,8 +97,8 @@ type Manager struct { // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL positions map[positionType]*position - // notifyQueue is a queue notifying when there is a new change. - notifyQueue chan struct{} + // notifyQueue is a queue notifying when there is a new change or there's something wrong with the log manager. + notifyQueue chan error } // NewManager returns an instance of Manager. @@ -116,7 +116,7 @@ func NewManager(storageName string, partitionID storage.PartitionID, stagingDire stateDirectory: stateDirectory, consumer: consumer, positions: positions, - notifyQueue: make(chan struct{}, 1), + notifyQueue: make(chan error, 1), } } @@ -192,13 +192,13 @@ func (mgr *Manager) AcknowledgeConsumerPosition(lsn storage.LSN) { // Alert the outsider. If it has a pending acknowledgement already no action is required. select { - case mgr.notifyQueue <- struct{}{}: + case mgr.notifyQueue <- nil: default: } } // GetNotificationQueue returns a notify channel so that caller can poll new changes. -func (mgr *Manager) GetNotificationQueue() <-chan struct{} { +func (mgr *Manager) GetNotificationQueue() <-chan error { return mgr.notifyQueue } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index dc0f632a0ff..b79bebeae11 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -2044,7 +2044,10 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned return errors.New("cleanup worker failed") case <-mgr.completedQueue: return nil - case <-mgr.logManager.GetNotificationQueue(): + case logErr := <-mgr.logManager.GetNotificationQueue(): + if logErr != nil { + return fmt.Errorf("log manager failed: %w", logErr) + } return nil case <-ctx.Done(): } -- GitLab From a55443670361f1aeae99f437d2b473f61c902ecd Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 20 Dec 2024 16:44:42 +0700 Subject: [PATCH 3/3] storage: Move log entry pruning to a concurrent task in log manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit move log entry pruning from TransactionManager to a concurrent goroutine managed by log.Manager. Previously, pruning was performed synchronously in the transaction processing loop, leading to head-of-line blocking. By relocating pruning to a dedicated goroutine, we reduce contention and enhance transaction throughput. Log entry pruning is now moved to log.Manager. It is initiated whenever there is an update in the log consuming position. The pruning task operates independently. It ensures that only one pruning operation is active at a time. Errors encountered during pruning are communicated through the notify queue, and the Close method now waits for all active pruning tasks to finish before shutting down. In this way, the entire lifecycle of log management is now encapsulated in log.Manager. Additionally, we no longer have a goroutine idling when it’s not needed. --- internal/gitaly/storage/storage.go | 5 - .../storagemgr/partition/log/log_manager.go | 144 ++++++---- .../partition/log/log_manager_test.go | 270 ++++++++++++++---- .../partition/transaction_manager.go | 4 - .../transaction_manager_consumer_test.go | 24 +- .../partition/transaction_manager_test.go | 17 +- 6 files changed, 325 insertions(+), 139 deletions(-) diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 466c4ea70c4..4afbccf00a4 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -196,11 +196,6 @@ type LogManager interface { // the Log Sequence Number (LSN) of the appended log entry. AppendLogEntry(logEntryPath string) (LSN, error) - // PruneLogEntries removes log entries that are no longer needed, based on retention policies. This is a leaky - // abstraction. The log manager should be fully responsible for the life cycle of log entries. We'll remove - // external access to log entry pruning in https://gitlab.com/gitlab-org/gitaly/-/issues/6529. - PruneLogEntries() error - // AppendedLSN returns the LSN of the latest appended log entry. AppendedLSN() LSN diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index 32e4a1b675b..f44046aad14 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -68,11 +68,17 @@ type Manager struct { // and signal internal workers to stop. It ensures proper cleanup when the Manager is no longer in use. cancel context.CancelFunc - // mutex protects access to critical states, especially `oldestLSN` and `appendedLSN`, as well as the integrity + // mutex protects access to critical states, especially `appendedLSN`, as well as the integrity // of inflight log entries. Since indices are monotonic, two parallel log appending operations result in pushing // files into the same directory and breaking the manifest file. Thus, Parallel log entry appending and pruning // are not supported. mutex sync.Mutex + // pruningMutex guards `oldestLSN` from parallel access and ensures there's only one pruning task running at the + // same time. + pruningMutex sync.Mutex + // wg tracks the concurrent tasks spawned by the log manager (if any). It is used to ensure all + // tasks exit gracefully. + wg sync.WaitGroup // storageName is the name of the storage the Manager's partition is a member of. storageName string @@ -94,7 +100,8 @@ type Manager struct { // consumer is an external caller that may perform read-only operations against applied log entries. Log entries // are retained until the consumer has acknowledged past their LSN. consumer storage.LogConsumer - // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL + // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL log + // entries are only pruned when they are not used anymore. positions map[positionType]*position // notifyQueue is a queue notifying when there is a new change or there's something wrong with the log manager. @@ -109,6 +116,7 @@ func NewManager(storageName string, partitionID storage.PartitionID, stagingDire if consumer != nil { positions[consumerPosition] = newPosition() } + return &Manager{ storageName: storageName, partitionID: partitionID, @@ -180,6 +188,7 @@ func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) erro // AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. func (mgr *Manager) AcknowledgeAppliedPosition(lsn storage.LSN) { mgr.positions[appliedPosition].setPosition(lsn) + mgr.pruneLogEntries() } // AcknowledgeConsumerPosition acknowledges log entries up and including LSN as successfully processed for the specified @@ -189,6 +198,7 @@ func (mgr *Manager) AcknowledgeConsumerPosition(lsn storage.LSN) { panic("log manager's consumer must be present prior to AcknowledgeConsumerPos call") } mgr.positions[consumerPosition].setPosition(lsn) + mgr.pruneLogEntries() // Alert the outsider. If it has a pending acknowledgement already no action is required. select { @@ -209,6 +219,7 @@ func (mgr *Manager) Close() error { return fmt.Errorf("log manager has not been initialized") } mgr.cancel() + mgr.wg.Wait() return nil } @@ -217,9 +228,6 @@ func (mgr *Manager) Close() error { // moves the log entry's directory to the WAL, and returns its LSN once it has // been committed to the log. func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { - mgr.mutex.Lock() - defer mgr.mutex.Unlock() - select { case <-mgr.ctx.Done(): return 0, mgr.ctx.Err() @@ -227,27 +235,34 @@ func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { } nextLSN := mgr.appendedLSN + 1 + if err := func() error { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + // Move the log entry from the staging directory into its place in the log. + destinationPath := mgr.GetEntryPath(nextLSN) + if err := os.Rename(logEntryPath, destinationPath); err != nil { + return fmt.Errorf("move wal files: %w", err) + } - // Move the log entry from the staging directory into its place in the log. - destinationPath := mgr.GetEntryPath(nextLSN) - if err := os.Rename(logEntryPath, destinationPath); err != nil { - return 0, fmt.Errorf("move wal files: %w", err) - } + // After this sync, the log entry has been persisted and will be recovered on failure. + if err := safe.NewSyncer().SyncParent(mgr.ctx, destinationPath); err != nil { + // If this fails, the log entry will be left in the write-ahead log but it is not + // properly persisted. If the fsync fails, something is seriously wrong and there's no + // point trying to delete the files. The right thing to do is to terminate Gitaly + // immediately as going further could cause data loss and corruption. This error check + // will later be replaced with a panic that terminates Gitaly. + // + // For more details, see: https://gitlab.com/gitlab-org/gitaly/-/issues/5774 + return fmt.Errorf("sync log entry: %w", err) + } + mgr.appendedLSN = nextLSN - // After this sync, the log entry has been persisted and will be recovered on failure. - if err := safe.NewSyncer().SyncParent(mgr.ctx, destinationPath); err != nil { - // If this fails, the log entry will be left in the write-ahead log but it is not - // properly persisted. If the fsync fails, something is seriously wrong and there's no - // point trying to delete the files. The right thing to do is to terminate Gitaly - // immediately as going further could cause data loss and corruption. This error check - // will later be replaced with a panic that terminates Gitaly. - // - // For more details, see: https://gitlab.com/gitlab-org/gitaly/-/issues/5774 - return 0, fmt.Errorf("sync log entry: %w", err) + return nil + }(); err != nil { + return 0, err } - mgr.appendedLSN = nextLSN - if mgr.consumer != nil { mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) } @@ -257,6 +272,7 @@ func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { func (mgr *Manager) createStateDirectory() error { needsFsync := false for _, path := range []string{ + mgr.tmpDirectory, mgr.stateDirectory, filepath.Join(mgr.stateDirectory, "wal"), } { @@ -289,35 +305,62 @@ func (mgr *Manager) createStateDirectory() error { return nil } -// PruneLogEntries prunes log entries from the Write-Ahead Log (WAL) that have been committed and are no longer -// needed. It ensures efficient storage management by removing redundant entries while maintaining the integrity of the -// log sequence. The method respects the established low-water mark, ensuring no entries that might still be required -// for transaction consistency are deleted. -func (mgr *Manager) PruneLogEntries() error { - mgr.mutex.Lock() - defer mgr.mutex.Unlock() +// pruneLogEntries schedules a task to prune log entries from the Write-Ahead Log (WAL) that have been committed and are +// no longer needed. It ensures efficient storage management by removing redundant entries while maintaining the +// integrity of the log sequence. The method respects the established low-water mark, ensuring no entries that might +// still be required for transaction consistency are deleted. The pruning task is performed in parallel so that the +// caller is unblocked instantly. It's guarantee that there's only one pruning task running at the same time. If +// there's one, a pruning task is initiated. +func (mgr *Manager) pruneLogEntries() { + select { + case <-mgr.ctx.Done(): + return + default: + } - // When a log entry is applied, if there is any log in front of it which are still referred, we cannot delete - // it. This condition is to prevent a "hole" in the list. A transaction referring to a log entry at the - // low-water mark might scan all afterward log entries. Thus, the manager needs to keep in the database. - // - // ┌── Consumer not acknowledged - // │ ┌─ Applied til this point - // Can remove │ │ ┌─ Free, but cannot be removed - // ◄───────────► │ │ │ - // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ - // └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ - // └─ oldestLSN ▲ - // │ - // Low-water mark - // - for mgr.oldestLSN < mgr.lowWaterMark() { - if err := mgr.deleteLogEntry(mgr.oldestLSN); err != nil { - return fmt.Errorf("deleting log entry: %w", err) - } - mgr.oldestLSN++ + // Try to acquire the pruning mutex. If it cannot be acquired, another pruning task is running. + if pruning := mgr.pruningMutex.TryLock(); !pruning { + return } - return nil + + // Now we have the responsibility to initiate a new pruning task. We defer the Unlock() to ensure: + // - No concurrent goroutines can also perform pruning. + // - The below goroutine only begins once this function exits. + defer mgr.pruningMutex.Unlock() + + mgr.wg.Add(1) + go func() { + defer mgr.wg.Done() + + // This task acquires the same mutex for the aforementioned task creation. This also guarantees + // redundant goroutines (rare, but possible) wait until the prior one finishes. + mgr.pruningMutex.Lock() + defer mgr.pruningMutex.Unlock() + + // All log entries below the low-water mark can be removed. However, we could like to maintain the + // oldest LSN. The log entries must be removed in order and the oldestLSN advances one by one. This + // approach is to prevent a log entry from being forgotten if the manager fails to remove it in a prior + // session. + // + // ┌── Consumer not acknowledged + // │ ┌─ Applied til this point + // Can remove │ │ ┌─ Not consumed nor applied, cannot be removed. + // ◄───────────► │ │ │ + // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ + // └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ + // └─ oldestLSN ▲ + // │ + // Low-water mark + // + for mgr.oldestLSN < mgr.lowWaterMark() { + if err := mgr.deleteLogEntry(mgr.oldestLSN); err != nil { + err = fmt.Errorf("deleting log entry: %w", err) + mgr.notifyQueue <- err + return + } + mgr.oldestLSN++ + } + }() } // AppendedLSN returns the index of latest appended log entry. @@ -370,6 +413,9 @@ func (mgr *Manager) deleteLogEntry(lsn storage.LSN) error { // lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than // this mark are removed. func (mgr *Manager) lowWaterMark() storage.LSN { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + minAcknowledged := mgr.appendedLSN + 1 // Position is the last acknowledged LSN, this is eligible for pruning. diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 6b3aef55bc3..5df69f2deaa 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -36,6 +36,18 @@ func setupLogManager(t *testing.T, ctx context.Context, consumer storage.LogCons return logManager } +func waitUntilPruningFinish(t *testing.T, manager *Manager) { + // Users of log manager are blocked until log pruning task is done. Log pruning runs in parallel and should not + // conflict other activities of log manager. In this test suite, we need to assert in-between states. Thus, the + // tests must wait until a task finishes before asserting. The easiest solution is to use errgroup's Wait(). + manager.wg.Wait() +} + +func assertDirectoryState(t *testing.T, manager *Manager, expected testhelper.DirectoryState) { + waitUntilPruningFinish(t, manager) + testhelper.RequireDirectoryState(t, manager.stateDirectory, "", expected) +} + func TestLogManager_Initialize(t *testing.T) { t.Parallel() @@ -47,33 +59,40 @@ func TestLogManager_Initialize(t *testing.T) { logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) + waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(1), logManager.oldestLSN) require.Equal(t, storage.LSN(0), logManager.appendedLSN) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) + + require.NoError(t, logManager.Close()) }) t.Run("existing WAL entries without existing appliedLSN", func(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) - logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) + + waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(1), logManager.oldestLSN) require.Equal(t, storage.LSN(2), logManager.appendedLSN) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -81,68 +100,68 @@ func TestLogManager_Initialize(t *testing.T) { "/wal/0000000000002": {Mode: mode.Directory}, "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, }) + require.NoError(t, logManager.Close()) }) t.Run("existing WAL entries with appliedLSN in-between", func(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } + require.NoError(t, logManager.Close()) logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 2)) - require.Equal(t, storage.LSN(1), logManager.oldestLSN) + waitUntilPruningFinish(t, logManager) + require.Equal(t, storage.LSN(3), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, - "/wal/0000000000001": {Mode: mode.Directory}, - "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, "/wal/0000000000003": {Mode: mode.Directory}, "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, }) + require.NoError(t, logManager.Close()) }) t.Run("existing WAL entries with up-to-date appliedLSN", func(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } + require.NoError(t, logManager.Close()) - logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 3)) - require.Equal(t, storage.LSN(1), logManager.oldestLSN) + waitUntilPruningFinish(t, logManager) + require.Equal(t, storage.LSN(4), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - "/wal/0000000000001": {Mode: mode.Directory}, - "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, - "/wal/0000000000003": {Mode: mode.Directory}, - "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, }) + + require.NoError(t, logManager.Close()) }) t.Run("double initialization error", func(t *testing.T) { @@ -157,6 +176,8 @@ func TestLogManager_Initialize(t *testing.T) { err := logManager.Initialize(ctx, 0) require.Error(t, err) require.Equal(t, "log manager already initialized", err.Error()) + + require.NoError(t, logManager.Close()) }) t.Run("context canceled before initialization", func(t *testing.T) { @@ -185,6 +206,7 @@ func TestLogManager_Initialize(t *testing.T) { // Check if the manager's context was also canceled require.EqualError(t, logManager.ctx.Err(), context.Canceled.Error()) + require.NoError(t, logManager.Close()) }) } @@ -196,12 +218,13 @@ func TestLogManager_PruneLogEntries(t *testing.T) { ctx := testhelper.Context(t) logManager := setupLogManager(t, ctx, nil) - // Expect no entries to be removed - require.NoError(t, logManager.PruneLogEntries()) + // Set this entry as applied + waitUntilPruningFinish(t, logManager) + require.Equal(t, storage.LSN(1), logManager.oldestLSN) // Assert on-disk state - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -215,23 +238,21 @@ func TestLogManager_PruneLogEntries(t *testing.T) { // Inject a single log entry appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) - // Set this entry as applied - logManager.AcknowledgeAppliedPosition(1) - // Before removal - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, }) - // Attempt to remove applied log entries - require.NoError(t, logManager.PruneLogEntries()) + // Set this entry as applied + logManager.AcknowledgeAppliedPosition(1) + waitUntilPruningFinish(t, logManager) // After removal require.Equal(t, storage.LSN(2), logManager.oldestLSN) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -247,13 +268,8 @@ func TestLogManager_PruneLogEntries(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } - // Set the applied LSN to 2 - logManager.AcknowledgeAppliedPosition(2) - // Manually set the consumer's position to the first entry, forcing low-water mark to retain it - logManager.AcknowledgeConsumerPosition(1) - // Before removal - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -264,11 +280,16 @@ func TestLogManager_PruneLogEntries(t *testing.T) { "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, }) - require.NoError(t, logManager.PruneLogEntries()) + // Set the applied LSN to 2 + logManager.AcknowledgeAppliedPosition(2) + // Manually set the consumer's position to the first entry, forcing low-water mark to retain it + logManager.AcknowledgeConsumerPosition(1) + + waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(2), logManager.oldestLSN) // Assert on-disk state to ensure no entries were removed - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000002": {Mode: mode.Directory}, @@ -288,11 +309,8 @@ func TestLogManager_PruneLogEntries(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } - // Set the applied LSN to 3, allowing the first three entries to be pruned - logManager.AcknowledgeAppliedPosition(3) - // Before removal - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -307,20 +325,120 @@ func TestLogManager_PruneLogEntries(t *testing.T) { "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, }) - require.NoError(t, logManager.PruneLogEntries()) + // Set the applied LSN to 3, allowing the first three entries to be pruned + logManager.AcknowledgeAppliedPosition(3) + waitUntilPruningFinish(t, logManager) // Ensure only entries starting from LSN 4 are retained require.Equal(t, storage.LSN(4), logManager.oldestLSN) // Assert on-disk state after removals + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000004": {Mode: mode.Directory}, + "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, + "/wal/0000000000005": {Mode: mode.Directory}, + "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, + }) + }) + + t.Run("log entry pruning fails", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + for i := 0; i < 5; i++ { + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + infectedPath := logManager.GetEntryPath(3) + + // Get the current permissions + info, err := os.Stat(infectedPath) + require.NoError(t, err) + originalMode := info.Mode() + + // Mark log entry 3 ready-only + require.NoError(t, os.Chmod(infectedPath, 0o444)) + + // The error is notified via notification queue so that the caller can act accordingly + logManager.AcknowledgeAppliedPosition(5) + require.ErrorContains(t, <-logManager.GetNotificationQueue(), "permission denied") + + require.NoError(t, logManager.Close()) + + // Restore the permission to assert the state + require.NoError(t, os.Chmod(infectedPath, originalMode)) testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, "/wal/0000000000004": {Mode: mode.Directory}, "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, "/wal/0000000000005": {Mode: mode.Directory}, "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, }) + + // Restart the manager + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 5)) + + waitUntilPruningFinish(t, logManager) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("trigger log entry pruning concurrently", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + var wg sync.WaitGroup + + const totalLSN = 25 + + // One producer goroutine + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < totalLSN; i++ { + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + }() + + // Three goroutines spams acknowledgement constantly. + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + if logManager.AppendedLSN() == totalLSN { + return + } + logManager.AcknowledgeAppliedPosition(logManager.AppendedLSN()) + } + }() + } + wg.Wait() + logManager.AcknowledgeAppliedPosition(logManager.AppendedLSN()) + + require.NoError(t, logManager.Close()) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) }) } @@ -338,7 +456,8 @@ func TestLogManager_AppendLogEntry(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) require.Equal(t, logManager.appendedLSN, storage.LSN(1)) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -361,7 +480,8 @@ func TestLogManager_AppendLogEntry(t *testing.T) { }) require.Equal(t, logManager.appendedLSN, storage.LSN(1)) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -384,7 +504,8 @@ func TestLogManager_AppendLogEntry(t *testing.T) { appendLogEntry(t, logManager, nil) require.Equal(t, logManager.appendedLSN, storage.LSN(3)) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -415,7 +536,6 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions := func(t *testing.T, logManager *Manager, consumed storage.LSN, applied storage.LSN) { logManager.AcknowledgeConsumerPosition(consumed) logManager.AcknowledgeAppliedPosition(applied) - require.NoError(t, logManager.PruneLogEntries()) } t.Run("consumer pos is set to 0 after initialized", func(t *testing.T) { @@ -425,7 +545,7 @@ func TestLogManager_Positions(t *testing.T) { require.Equal(t, [][]storage.LSN(nil), mockConsumer.positions) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -452,7 +572,7 @@ func TestLogManager_Positions(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-4")}) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000002": {Mode: mode.Directory}, @@ -473,11 +593,12 @@ func TestLogManager_Positions(t *testing.T) { // Both consumer and applier catch up. simulatePositions(t, logManager, 4, 4) + waitUntilPruningFinish(t, logManager) // All log entries are pruned at this point. The consumer should not be notified again. require.Equal(t, [][]storage.LSN{{2, 4}}, mockConsumer.positions) require.Equal(t, storage.LSN(5), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -495,7 +616,7 @@ func TestLogManager_Positions(t *testing.T) { require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -517,7 +638,7 @@ func TestLogManager_Positions(t *testing.T) { require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000002": {Mode: mode.Directory}, @@ -537,7 +658,7 @@ func TestLogManager_Positions(t *testing.T) { require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -557,14 +678,16 @@ func TestLogManager_Positions(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 2, 2) - // The oldest LSN changes after each acknowledgement - require.Equal(t, [][]storage.LSN{{1, 1}, {2, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + logManager.pruneLogEntries() - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) + + // The oldest LSN changes after each acknowledgement + require.Equal(t, [][]storage.LSN{{1, 1}, {2, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) }) t.Run("append while consumer is busy with prior entries", func(t *testing.T) { @@ -581,7 +704,8 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions(t, logManager, 3, 3) require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -600,7 +724,7 @@ func TestLogManager_Positions(t *testing.T) { require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000002": {Mode: mode.Directory}, @@ -611,7 +735,8 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions(t, logManager, 3, 3) require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -666,4 +791,23 @@ func TestLogManager_Close(t *testing.T) { require.Error(t, err) require.Equal(t, context.Canceled, err) }) + + t.Run("close waits for pruning tasks", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Inject log entries + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"2": []byte("content-2")}) + + // Trigger pruning + logManager.AcknowledgeAppliedPosition(2) + + // Close the manager and ensure all tasks are completed + require.NoError(t, logManager.Close()) + + // Verify the oldestLSN after pruning + require.Equal(t, storage.LSN(3), logManager.oldestLSN) + }) } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index b79bebeae11..05a41952df1 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -2013,10 +2013,6 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { continue } - if err := mgr.logManager.PruneLogEntries(); err != nil { - return fmt.Errorf("pruning log entries: %w", err) - } - if err := mgr.processTransaction(ctx); err != nil { return fmt.Errorf("process transaction: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go index 17029b01082..37246625020 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go @@ -377,20 +377,30 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti "refs/heads/third": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Third.OID}, }, }, - ConsumerAcknowledge{ - LSN: 3, - }, CloseManager{}, StartManager{}, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + // Wait until the first acknowledgement after restart + <-tm.logManager.GetNotificationQueue() + }), }, expectedState: StateAssertion{ Database: DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), }, - Directory: testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - }, + Directory: gittest.FilesOrReftables( + testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + // 1 and 2 were pruned before the manager was closed. + // 3 is not pruned because the consumer hasn't acknowledged it after the + // restart. + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/third", setup.Commits.Third.OID)), + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte(setup.Commits.Third.OID + "\n")}, + }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ + 3: {{"refs/heads/third": git.ReferenceUpdate{NewOID: setup.Commits.Third.OID}}}, + })), Repositories: RepositoryStates{ setup.RelativePath: { DefaultBranch: "refs/heads/main", diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index b5d41f1db0a..45daec4e614 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -2115,14 +2115,6 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t TransactionID: 4, ExpectedError: storage.ErrTransactionProcessingStopped, }, - AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - RequireDatabase(t, ctx, tm.db, DatabaseState{ - string(keyAppliedLSN): storage.LSN(3).ToProto(), - }) - // Transaction 2 and 3 are left-over. - require.NoDirExists(t, tm.logManager.GetEntryPath(2)) - require.NoDirExists(t, tm.logManager.GetEntryPath(3)) - }), StartManager{}, AssertManager{}, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { @@ -2134,6 +2126,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t require.Equal(t, tm.appliedLSN, storage.LSN(3)) require.Equal(t, tm.logManager.AppendedLSN(), storage.LSN(3)) }), + CloseManager{}, }, expectedState: StateAssertion{ Database: DatabaseState{ @@ -2273,14 +2266,12 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t RequireDatabase(t, ctx, tm.db, DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), }) - // Transaction 2 and 3 are left-over. - require.NoDirExists(t, logManager.GetEntryPath(2)) - require.NoDirExists(t, logManager.GetEntryPath(3)) testhelper.RequireDirectoryState(t, logManager.GetEntryPath(4), "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/MANIFEST": expectedManifest, "/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, }) + require.NoError(t, logManager.Close()) }), StartManager{}, AssertManager{}, @@ -2292,6 +2283,10 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }) require.Equal(t, tm.appliedLSN, storage.LSN(4)) require.Equal(t, tm.logManager.AppendedLSN(), storage.LSN(4)) + + // Transaction 2 and 3 are left-over. + require.NoDirExists(t, tm.logManager.GetEntryPath(2)) + require.NoDirExists(t, tm.logManager.GetEntryPath(3)) }), }, expectedState: StateAssertion{ -- GitLab