diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index 710ccad1a9c5286847d562f49cb62190efeb9d2e..70b2113e1b089ff731703b23733b0a85bc262dae 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -617,23 +617,23 @@ func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { logManagers = append(logManagers, logManager) } - appendLogEntry(t, ctx, logManagers[0], map[string][]byte{ + appendLogEntry(t, logManagers[0], map[string][]byte{ "1": []byte("content-1"), "2": []byte("content-2"), "3": []byte("content-3"), }) - <-logManagers[0].GetNotificationQueue() + require.Nil(t, <-logManagers[0].GetNotificationQueue()) // KV operation without any file. - appendLogEntry(t, ctx, logManagers[1], map[string][]byte{}) - <-logManagers[1].GetNotificationQueue() + appendLogEntry(t, logManagers[1], map[string][]byte{}) + require.Nil(t, <-logManagers[1].GetNotificationQueue()) - appendLogEntry(t, ctx, logManagers[1], map[string][]byte{ + appendLogEntry(t, logManagers[1], map[string][]byte{ "4": []byte("content-4"), "5": []byte("content-5"), "6": []byte("content-6"), }) - <-logManagers[1].GetNotificationQueue() + require.Nil(t, <-logManagers[1].GetNotificationQueue()) archiver.Close() @@ -680,7 +680,7 @@ func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { testhelper.RequirePromMetrics(t, archiver.backupCounter, buildMetrics(t, 3, 0)) } -func appendLogEntry(t *testing.T, ctx context.Context, manager *log.Manager, files map[string][]byte) storage.LSN { +func appendLogEntry(t *testing.T, manager *log.Manager, files map[string][]byte) storage.LSN { t.Helper() logEntryPath := testhelper.TempDir(t) @@ -689,7 +689,7 @@ func appendLogEntry(t *testing.T, ctx context.Context, manager *log.Manager, fil require.NoError(t, os.WriteFile(path, value, mode.File)) } - nextLSN, err := manager.AppendLogEntry(ctx, logEntryPath) + nextLSN, err := manager.AppendLogEntry(logEntryPath) require.NoError(t, err) return nextLSN diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 54e6f1cb02dea415c35118bfef8a261b2a95ff94..4afbccf00a47476accb7c84fe9360175e40cf2e7 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -188,20 +188,19 @@ type LogManager interface { // It ensures the environment is ready, and previous states are resumed correctly. Initialize(ctx context.Context, appliedLSN LSN) error + // Close stops the log manager, cleans up resources, and stops internal workers. The caller + // is blocked until complete. + Close() error + // AppendLogEntry appends an entry to the WAL. logEntryPath specifies the directory of the log entry. It returns // the Log Sequence Number (LSN) of the appended log entry. - AppendLogEntry(ctx context.Context, logEntryPath string) (LSN, error) - - // PruneLogEntries removes log entries that are no longer needed, based on retention policies. This is a leaky - // abstraction. The log manager should be fully responsible for the life cycle of log entries. We'll remove - // external access to log entry pruning in https://gitlab.com/gitlab-org/gitaly/-/issues/6529. - PruneLogEntries(ctx context.Context) error + AppendLogEntry(logEntryPath string) (LSN, error) // AppendedLSN returns the LSN of the latest appended log entry. AppendedLSN() LSN // GetNotificationQueue returns a channel that is used to notify external components of changes. - GetNotificationQueue() <-chan struct{} + GetNotificationQueue() <-chan error } // Partition is responsible for a single partition of data. diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index 70e71981faf8951c92e49553fa7347764bf2a480..f44046aad14b3ac7678076c2b799dc1591fe380a 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -61,11 +61,24 @@ func (p *position) setPosition(pos storage.LSN) { // references and acknowledgements. It effectively abstracts WAL operations from the TransactionManager, contributing to // a cleaner separation of concerns and making the system more maintainable and extensible. type Manager struct { - // mutex protects access to critical states, especially `oldestLSN` and `appendedLSN`, as well as the integrity + // ctx is the context that associated with the Manager instance. This is used for managing cancellations, + // deadlines, and carrying request-scoped values across log operations. + ctx context.Context + // cancel is the cancellation function for the Manager's context. It is used to explicitly cancel the context + // and signal internal workers to stop. It ensures proper cleanup when the Manager is no longer in use. + cancel context.CancelFunc + + // mutex protects access to critical states, especially `appendedLSN`, as well as the integrity // of inflight log entries. Since indices are monotonic, two parallel log appending operations result in pushing // files into the same directory and breaking the manifest file. Thus, Parallel log entry appending and pruning // are not supported. mutex sync.Mutex + // pruningMutex guards `oldestLSN` from parallel access and ensures there's only one pruning task running at the + // same time. + pruningMutex sync.Mutex + // wg tracks the concurrent tasks spawned by the log manager (if any). It is used to ensure all + // tasks exit gracefully. + wg sync.WaitGroup // storageName is the name of the storage the Manager's partition is a member of. storageName string @@ -87,11 +100,12 @@ type Manager struct { // consumer is an external caller that may perform read-only operations against applied log entries. Log entries // are retained until the consumer has acknowledged past their LSN. consumer storage.LogConsumer - // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL + // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL log + // entries are only pruned when they are not used anymore. positions map[positionType]*position - // notifyQueue is a queue notifying when there is a new change. - notifyQueue chan struct{} + // notifyQueue is a queue notifying when there is a new change or there's something wrong with the log manager. + notifyQueue chan error } // NewManager returns an instance of Manager. @@ -102,6 +116,7 @@ func NewManager(storageName string, partitionID storage.PartitionID, stagingDire if consumer != nil { positions[consumerPosition] = newPosition() } + return &Manager{ storageName: storageName, partitionID: partitionID, @@ -109,7 +124,7 @@ func NewManager(storageName string, partitionID storage.PartitionID, stagingDire stateDirectory: stateDirectory, consumer: consumer, positions: positions, - notifyQueue: make(chan struct{}, 1), + notifyQueue: make(chan error, 1), } } @@ -121,7 +136,18 @@ func NewManager(storageName string, partitionID storage.PartitionID, stagingDire // initial log entry state, enabling consumers to start processing from the correct point. Proper initialization is // crucial for maintaining data consistency and ensuring that log entries are managed accurately upon system startup. func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) error { - if err := mgr.createStateDirectory(ctx); err != nil { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + if mgr.ctx != nil { + return fmt.Errorf("log manager already initialized") + } else if ctx.Err() != nil { + return ctx.Err() + } + + mgr.ctx, mgr.cancel = context.WithCancel(ctx) + + if err := mgr.createStateDirectory(); err != nil { return fmt.Errorf("create state directory: %w", err) } @@ -162,6 +188,7 @@ func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) erro // AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. func (mgr *Manager) AcknowledgeAppliedPosition(lsn storage.LSN) { mgr.positions[appliedPosition].setPosition(lsn) + mgr.pruneLogEntries() } // AcknowledgeConsumerPosition acknowledges log entries up and including LSN as successfully processed for the specified @@ -171,58 +198,81 @@ func (mgr *Manager) AcknowledgeConsumerPosition(lsn storage.LSN) { panic("log manager's consumer must be present prior to AcknowledgeConsumerPos call") } mgr.positions[consumerPosition].setPosition(lsn) + mgr.pruneLogEntries() // Alert the outsider. If it has a pending acknowledgement already no action is required. select { - case mgr.notifyQueue <- struct{}{}: + case mgr.notifyQueue <- nil: default: } } // GetNotificationQueue returns a notify channel so that caller can poll new changes. -func (mgr *Manager) GetNotificationQueue() <-chan struct{} { +func (mgr *Manager) GetNotificationQueue() <-chan error { return mgr.notifyQueue } +// Close gracefully shuts down the log manager by canceling its context and signaling any associated internal workers to +// stop. The closer is blocked until all resources are released and workers (if any) have already stopped. +func (mgr *Manager) Close() error { + if mgr.cancel == nil { + return fmt.Errorf("log manager has not been initialized") + } + mgr.cancel() + mgr.wg.Wait() + return nil +} + // AppendLogEntry appends an entry to the write-ahead log. logEntryPath is an // absolute path to the directory that represents the log entry. appendLogEntry // moves the log entry's directory to the WAL, and returns its LSN once it has // been committed to the log. -func (mgr *Manager) AppendLogEntry(ctx context.Context, logEntryPath string) (storage.LSN, error) { - mgr.mutex.Lock() - defer mgr.mutex.Unlock() +func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { + select { + case <-mgr.ctx.Done(): + return 0, mgr.ctx.Err() + default: + } nextLSN := mgr.appendedLSN + 1 + if err := func() error { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + // Move the log entry from the staging directory into its place in the log. + destinationPath := mgr.GetEntryPath(nextLSN) + if err := os.Rename(logEntryPath, destinationPath); err != nil { + return fmt.Errorf("move wal files: %w", err) + } - // Move the log entry from the staging directory into its place in the log. - destinationPath := mgr.GetEntryPath(nextLSN) - if err := os.Rename(logEntryPath, destinationPath); err != nil { - return 0, fmt.Errorf("move wal files: %w", err) - } + // After this sync, the log entry has been persisted and will be recovered on failure. + if err := safe.NewSyncer().SyncParent(mgr.ctx, destinationPath); err != nil { + // If this fails, the log entry will be left in the write-ahead log but it is not + // properly persisted. If the fsync fails, something is seriously wrong and there's no + // point trying to delete the files. The right thing to do is to terminate Gitaly + // immediately as going further could cause data loss and corruption. This error check + // will later be replaced with a panic that terminates Gitaly. + // + // For more details, see: https://gitlab.com/gitlab-org/gitaly/-/issues/5774 + return fmt.Errorf("sync log entry: %w", err) + } + mgr.appendedLSN = nextLSN - // After this sync, the log entry has been persisted and will be recovered on failure. - if err := safe.NewSyncer().SyncParent(ctx, destinationPath); err != nil { - // If this fails, the log entry will be left in the write-ahead log but it is not - // properly persisted. If the fsync fails, something is seriously wrong and there's no - // point trying to delete the files. The right thing to do is to terminate Gitaly - // immediately as going further could cause data loss and corruption. This error check - // will later be replaced with a panic that terminates Gitaly. - // - // For more details, see: https://gitlab.com/gitlab-org/gitaly/-/issues/5774 - return 0, fmt.Errorf("sync log entry: %w", err) + return nil + }(); err != nil { + return 0, err } - mgr.appendedLSN = nextLSN - if mgr.consumer != nil { mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) } return nextLSN, nil } -func (mgr *Manager) createStateDirectory(ctx context.Context) error { +func (mgr *Manager) createStateDirectory() error { needsFsync := false for _, path := range []string{ + mgr.tmpDirectory, mgr.stateDirectory, filepath.Join(mgr.stateDirectory, "wal"), } { @@ -244,46 +294,73 @@ func (mgr *Manager) createStateDirectory(ctx context.Context) error { } syncer := safe.NewSyncer() - if err := syncer.SyncRecursive(ctx, mgr.stateDirectory); err != nil { + if err := syncer.SyncRecursive(mgr.ctx, mgr.stateDirectory); err != nil { return fmt.Errorf("sync: %w", err) } - if err := syncer.SyncParent(ctx, mgr.stateDirectory); err != nil { + if err := syncer.SyncParent(mgr.ctx, mgr.stateDirectory); err != nil { return fmt.Errorf("sync parent: %w", err) } return nil } -// PruneLogEntries prunes log entries from the Write-Ahead Log (WAL) that have been committed and are no longer -// needed. It ensures efficient storage management by removing redundant entries while maintaining the integrity of the -// log sequence. The method respects the established low-water mark, ensuring no entries that might still be required -// for transaction consistency are deleted. -func (mgr *Manager) PruneLogEntries(ctx context.Context) error { - mgr.mutex.Lock() - defer mgr.mutex.Unlock() +// pruneLogEntries schedules a task to prune log entries from the Write-Ahead Log (WAL) that have been committed and are +// no longer needed. It ensures efficient storage management by removing redundant entries while maintaining the +// integrity of the log sequence. The method respects the established low-water mark, ensuring no entries that might +// still be required for transaction consistency are deleted. The pruning task is performed in parallel so that the +// caller is unblocked instantly. It's guarantee that there's only one pruning task running at the same time. If +// there's one, a pruning task is initiated. +func (mgr *Manager) pruneLogEntries() { + select { + case <-mgr.ctx.Done(): + return + default: + } - // When a log entry is applied, if there is any log in front of it which are still referred, we cannot delete - // it. This condition is to prevent a "hole" in the list. A transaction referring to a log entry at the - // low-water mark might scan all afterward log entries. Thus, the manager needs to keep in the database. - // - // ┌── Consumer not acknowledged - // │ ┌─ Applied til this point - // Can remove │ │ ┌─ Free, but cannot be removed - // ◄───────────► │ │ │ - // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ - // └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ - // └─ oldestLSN ▲ - // │ - // Low-water mark - // - for mgr.oldestLSN < mgr.lowWaterMark() { - if err := mgr.deleteLogEntry(ctx, mgr.oldestLSN); err != nil { - return fmt.Errorf("deleting log entry: %w", err) - } - mgr.oldestLSN++ + // Try to acquire the pruning mutex. If it cannot be acquired, another pruning task is running. + if pruning := mgr.pruningMutex.TryLock(); !pruning { + return } - return nil + + // Now we have the responsibility to initiate a new pruning task. We defer the Unlock() to ensure: + // - No concurrent goroutines can also perform pruning. + // - The below goroutine only begins once this function exits. + defer mgr.pruningMutex.Unlock() + + mgr.wg.Add(1) + go func() { + defer mgr.wg.Done() + + // This task acquires the same mutex for the aforementioned task creation. This also guarantees + // redundant goroutines (rare, but possible) wait until the prior one finishes. + mgr.pruningMutex.Lock() + defer mgr.pruningMutex.Unlock() + + // All log entries below the low-water mark can be removed. However, we could like to maintain the + // oldest LSN. The log entries must be removed in order and the oldestLSN advances one by one. This + // approach is to prevent a log entry from being forgotten if the manager fails to remove it in a prior + // session. + // + // ┌── Consumer not acknowledged + // │ ┌─ Applied til this point + // Can remove │ │ ┌─ Not consumed nor applied, cannot be removed. + // ◄───────────► │ │ │ + // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ + // └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ + // └─ oldestLSN ▲ + // │ + // Low-water mark + // + for mgr.oldestLSN < mgr.lowWaterMark() { + if err := mgr.deleteLogEntry(mgr.oldestLSN); err != nil { + err = fmt.Errorf("deleting log entry: %w", err) + mgr.notifyQueue <- err + return + } + mgr.oldestLSN++ + } + }() } // AppendedLSN returns the index of latest appended log entry. @@ -300,8 +377,8 @@ func (mgr *Manager) GetEntryPath(lsn storage.LSN) string { } // deleteLogEntry deletes the log entry at the given LSN from the log. -func (mgr *Manager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { - defer trace.StartRegion(ctx, "deleteLogEntry").End() +func (mgr *Manager) deleteLogEntry(lsn storage.LSN) error { + defer trace.StartRegion(mgr.ctx, "deleteLogEntry").End() tmpDir, err := os.MkdirTemp(mgr.tmpDirectory, "") if err != nil { @@ -319,7 +396,7 @@ func (mgr *Manager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { return fmt.Errorf("rename: %w", err) } - if err := safe.NewSyncer().SyncParent(ctx, logEntryPath); err != nil { + if err := safe.NewSyncer().SyncParent(mgr.ctx, logEntryPath); err != nil { return fmt.Errorf("sync file deletion: %w", err) } @@ -336,6 +413,9 @@ func (mgr *Manager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { // lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than // this mark are removed. func (mgr *Manager) lowWaterMark() storage.LSN { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + minAcknowledged := mgr.appendedLSN + 1 // Position is the last acknowledged LSN, this is eligible for pruning. diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 27ca31a57e1336e463bde10f952df8437ae89c94..5df69f2deaaa07e343787c586daea94f75939172 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -14,7 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) -func appendLogEntry(t *testing.T, ctx context.Context, manager *Manager, files map[string][]byte) storage.LSN { +func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) storage.LSN { t.Helper() logEntryPath := testhelper.TempDir(t) @@ -23,7 +23,7 @@ func appendLogEntry(t *testing.T, ctx context.Context, manager *Manager, files m require.NoError(t, os.WriteFile(path, value, mode.File)) } - nextLSN, err := manager.AppendLogEntry(ctx, logEntryPath) + nextLSN, err := manager.AppendLogEntry(logEntryPath) require.NoError(t, err) return nextLSN @@ -36,6 +36,18 @@ func setupLogManager(t *testing.T, ctx context.Context, consumer storage.LogCons return logManager } +func waitUntilPruningFinish(t *testing.T, manager *Manager) { + // Users of log manager are blocked until log pruning task is done. Log pruning runs in parallel and should not + // conflict other activities of log manager. In this test suite, we need to assert in-between states. Thus, the + // tests must wait until a task finishes before asserting. The easiest solution is to use errgroup's Wait(). + manager.wg.Wait() +} + +func assertDirectoryState(t *testing.T, manager *Manager, expected testhelper.DirectoryState) { + waitUntilPruningFinish(t, manager) + testhelper.RequireDirectoryState(t, manager.stateDirectory, "", expected) +} + func TestLogManager_Initialize(t *testing.T) { t.Parallel() @@ -47,33 +59,40 @@ func TestLogManager_Initialize(t *testing.T) { logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) + waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(1), logManager.oldestLSN) require.Equal(t, storage.LSN(0), logManager.appendedLSN) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) + + require.NoError(t, logManager.Close()) }) t.Run("existing WAL entries without existing appliedLSN", func(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) - logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) + + waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(1), logManager.oldestLSN) require.Equal(t, storage.LSN(2), logManager.appendedLSN) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -81,68 +100,113 @@ func TestLogManager_Initialize(t *testing.T) { "/wal/0000000000002": {Mode: mode.Directory}, "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, }) + require.NoError(t, logManager.Close()) }) t.Run("existing WAL entries with appliedLSN in-between", func(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) - require.NoError(t, logManager.createStateDirectory(ctx)) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } + require.NoError(t, logManager.Close()) logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 2)) - require.Equal(t, storage.LSN(1), logManager.oldestLSN) + waitUntilPruningFinish(t, logManager) + require.Equal(t, storage.LSN(3), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, - "/wal/0000000000001": {Mode: mode.Directory}, - "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, "/wal/0000000000003": {Mode: mode.Directory}, "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, }) + require.NoError(t, logManager.Close()) }) t.Run("existing WAL entries with up-to-date appliedLSN", func(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } + require.NoError(t, logManager.Close()) - logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 3)) - require.Equal(t, storage.LSN(1), logManager.oldestLSN) + waitUntilPruningFinish(t, logManager) + require.Equal(t, storage.LSN(4), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - "/wal/0000000000001": {Mode: mode.Directory}, - "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, - "/wal/0000000000003": {Mode: mode.Directory}, - "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, }) + + require.NoError(t, logManager.Close()) + }) + + t.Run("double initialization error", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + // Attempt to initialize again + err := logManager.Initialize(ctx, 0) + require.Error(t, err) + require.Equal(t, "log manager already initialized", err.Error()) + + require.NoError(t, logManager.Close()) + }) + + t.Run("context canceled before initialization", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(testhelper.Context(t)) + cancel() // Cancel the context before initializing + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + err := logManager.Initialize(ctx, 0) + + require.Error(t, err) + require.Equal(t, context.Canceled, err) + }) + + t.Run("context canceled after initialization", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(testhelper.Context(t)) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + // Cancel the context after initialization + cancel() + + // Check if the manager's context was also canceled + require.EqualError(t, logManager.ctx.Err(), context.Canceled.Error()) + require.NoError(t, logManager.Close()) }) } @@ -154,12 +218,13 @@ func TestLogManager_PruneLogEntries(t *testing.T) { ctx := testhelper.Context(t) logManager := setupLogManager(t, ctx, nil) - // Expect no entries to be removed - require.NoError(t, logManager.PruneLogEntries(ctx)) + // Set this entry as applied + waitUntilPruningFinish(t, logManager) + require.Equal(t, storage.LSN(1), logManager.oldestLSN) // Assert on-disk state - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -171,25 +236,23 @@ func TestLogManager_PruneLogEntries(t *testing.T) { logManager := setupLogManager(t, ctx, nil) // Inject a single log entry - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - - // Set this entry as applied - logManager.AcknowledgeAppliedPosition(1) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) // Before removal - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, }) - // Attempt to remove applied log entries - require.NoError(t, logManager.PruneLogEntries(ctx)) + // Set this entry as applied + logManager.AcknowledgeAppliedPosition(1) + waitUntilPruningFinish(t, logManager) // After removal require.Equal(t, storage.LSN(2), logManager.oldestLSN) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -202,16 +265,11 @@ func TestLogManager_PruneLogEntries(t *testing.T) { // Inject multiple log entries for i := 0; i < 3; i++ { - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } - // Set the applied LSN to 2 - logManager.AcknowledgeAppliedPosition(2) - // Manually set the consumer's position to the first entry, forcing low-water mark to retain it - logManager.AcknowledgeConsumerPosition(1) - // Before removal - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -222,11 +280,16 @@ func TestLogManager_PruneLogEntries(t *testing.T) { "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, }) - require.NoError(t, logManager.PruneLogEntries(ctx)) + // Set the applied LSN to 2 + logManager.AcknowledgeAppliedPosition(2) + // Manually set the consumer's position to the first entry, forcing low-water mark to retain it + logManager.AcknowledgeConsumerPosition(1) + + waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(2), logManager.oldestLSN) // Assert on-disk state to ensure no entries were removed - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000002": {Mode: mode.Directory}, @@ -243,14 +306,11 @@ func TestLogManager_PruneLogEntries(t *testing.T) { // Inject multiple log entries for i := 0; i < 5; i++ { - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } - // Set the applied LSN to 3, allowing the first three entries to be pruned - logManager.AcknowledgeAppliedPosition(3) - // Before removal - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -265,20 +325,120 @@ func TestLogManager_PruneLogEntries(t *testing.T) { "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, }) - require.NoError(t, logManager.PruneLogEntries(ctx)) + // Set the applied LSN to 3, allowing the first three entries to be pruned + logManager.AcknowledgeAppliedPosition(3) + waitUntilPruningFinish(t, logManager) // Ensure only entries starting from LSN 4 are retained require.Equal(t, storage.LSN(4), logManager.oldestLSN) // Assert on-disk state after removals + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000004": {Mode: mode.Directory}, + "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, + "/wal/0000000000005": {Mode: mode.Directory}, + "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, + }) + }) + + t.Run("log entry pruning fails", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + for i := 0; i < 5; i++ { + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + infectedPath := logManager.GetEntryPath(3) + + // Get the current permissions + info, err := os.Stat(infectedPath) + require.NoError(t, err) + originalMode := info.Mode() + + // Mark log entry 3 ready-only + require.NoError(t, os.Chmod(infectedPath, 0o444)) + + // The error is notified via notification queue so that the caller can act accordingly + logManager.AcknowledgeAppliedPosition(5) + require.ErrorContains(t, <-logManager.GetNotificationQueue(), "permission denied") + + require.NoError(t, logManager.Close()) + + // Restore the permission to assert the state + require.NoError(t, os.Chmod(infectedPath, originalMode)) testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, "/wal/0000000000004": {Mode: mode.Directory}, "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, "/wal/0000000000005": {Mode: mode.Directory}, "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, }) + + // Restart the manager + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 5)) + + waitUntilPruningFinish(t, logManager) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("trigger log entry pruning concurrently", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + var wg sync.WaitGroup + + const totalLSN = 25 + + // One producer goroutine + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < totalLSN; i++ { + appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + }() + + // Three goroutines spams acknowledgement constantly. + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + if logManager.AppendedLSN() == totalLSN { + return + } + logManager.AcknowledgeAppliedPosition(logManager.AppendedLSN()) + } + }() + } + wg.Wait() + logManager.AcknowledgeAppliedPosition(logManager.AppendedLSN()) + + require.NoError(t, logManager.Close()) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) }) } @@ -293,10 +453,11 @@ func TestLogManager_AppendLogEntry(t *testing.T) { require.Equal(t, logManager.appendedLSN, storage.LSN(0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) require.Equal(t, logManager.appendedLSN, storage.LSN(1)) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -312,14 +473,15 @@ func TestLogManager_AppendLogEntry(t *testing.T) { require.Equal(t, logManager.appendedLSN, storage.LSN(0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{ + appendLogEntry(t, logManager, map[string][]byte{ "1": []byte("content-1"), "2": []byte("content-2"), "3": []byte("content-3"), }) require.Equal(t, logManager.appendedLSN, storage.LSN(1)) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -337,12 +499,13 @@ func TestLogManager_AppendLogEntry(t *testing.T) { require.Equal(t, logManager.appendedLSN, storage.LSN(0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2-1"), "2": []byte("content-2-2")}) - appendLogEntry(t, ctx, logManager, nil) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2-1"), "2": []byte("content-2-2")}) + appendLogEntry(t, logManager, nil) require.Equal(t, logManager.appendedLSN, storage.LSN(3)) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -373,7 +536,6 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions := func(t *testing.T, logManager *Manager, consumed storage.LSN, applied storage.LSN) { logManager.AcknowledgeConsumerPosition(consumed) logManager.AcknowledgeAppliedPosition(applied) - require.NoError(t, logManager.PruneLogEntries(ctx)) } t.Run("consumer pos is set to 0 after initialized", func(t *testing.T) { @@ -383,7 +545,7 @@ func TestLogManager_Positions(t *testing.T) { require.Equal(t, [][]storage.LSN(nil), mockConsumer.positions) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -398,8 +560,8 @@ func TestLogManager_Positions(t *testing.T) { logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) require.NoError(t, logManager.Initialize(ctx, 0)) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) // Apply to 3 but consume to 1 simulatePositions(t, logManager, 1, 2) @@ -407,10 +569,10 @@ func TestLogManager_Positions(t *testing.T) { require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) // Inject 3, 4 - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-4")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-4")}) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000002": {Mode: mode.Directory}, @@ -431,11 +593,12 @@ func TestLogManager_Positions(t *testing.T) { // Both consumer and applier catch up. simulatePositions(t, logManager, 4, 4) + waitUntilPruningFinish(t, logManager) // All log entries are pruned at this point. The consumer should not be notified again. require.Equal(t, [][]storage.LSN{{2, 4}}, mockConsumer.positions) require.Equal(t, storage.LSN(5), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -445,15 +608,15 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 0, 2) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -467,15 +630,15 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 1, 2) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000002": {Mode: mode.Directory}, @@ -487,15 +650,15 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 2, 0) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000001": {Mode: mode.Directory}, @@ -509,37 +672,40 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) simulatePositions(t, logManager, 1, 1) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 2, 2) - // The oldest LSN changes after each acknowledgement - require.Equal(t, [][]storage.LSN{{1, 1}, {2, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + logManager.pruneLogEntries() - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) + + // The oldest LSN changes after each acknowledgement + require.Equal(t, [][]storage.LSN{{1, 1}, {2, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) }) t.Run("append while consumer is busy with prior entries", func(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) simulatePositions(t, logManager, 0, 1) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) simulatePositions(t, logManager, 0, 2) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) simulatePositions(t, logManager, 3, 3) require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) @@ -549,16 +715,16 @@ func TestLogManager_Positions(t *testing.T) { mockConsumer := &mockLogConsumer{} logManager := setupLogManager(t, ctx, mockConsumer) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) - appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) // 2 and 3 are not applied, hence kept intact. simulatePositions(t, logManager, 3, 1) require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000002": {Mode: mode.Directory}, @@ -569,9 +735,79 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions(t, logManager, 3, 3) require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) - testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + + assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, }) }) } + +func TestLogManager_Close(t *testing.T) { + t.Parallel() + + t.Run("close uninitialized manager", func(t *testing.T) { + t.Parallel() + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil) + + // Attempt to close the manager before initialization + err := logManager.Close() + require.Error(t, err) + require.Equal(t, "log manager has not been initialized", err.Error()) + }) + + t.Run("close after initialization", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil) + + // Properly initialize the manager + require.NoError(t, logManager.Initialize(ctx, 0)) + + // Close the manager + require.NoError(t, logManager.Close()) + + // Verify the context has been canceled + require.EqualError(t, logManager.ctx.Err(), context.Canceled.Error()) + }) + + t.Run("close after appending log entries", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Append some log entries + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"2": []byte("content-2")}) + + // Close the manager + require.NoError(t, logManager.Close()) + + // Verify the context has been canceled + require.EqualError(t, logManager.ctx.Err(), context.Canceled.Error()) + + // Further appending should fail due to the canceled context + _, err := logManager.AppendLogEntry(testhelper.TempDir(t)) + require.Error(t, err) + require.Equal(t, context.Canceled, err) + }) + + t.Run("close waits for pruning tasks", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Inject log entries + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"2": []byte("content-2")}) + + // Trigger pruning + logManager.AcknowledgeAppliedPosition(2) + + // Close the manager and ensure all tasks are completed + require.NoError(t, logManager.Close()) + + // Verify the oldestLSN after pruning + require.Equal(t, storage.LSN(3), logManager.oldestLSN) + }) +} diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index ad56f3c5ae17bfe73bc6163781bdb9eff1cbac27..05a41952df1b8c7d1d844491586e41b6e8aed4a8 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -1990,6 +1990,11 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { returnedErr = errors.Join(returnedErr, fmt.Errorf("clean up worker: %w", err)) } }() + defer func() { + if err := mgr.logManager.Close(); err != nil { + returnedErr = errors.Join(returnedErr, fmt.Errorf("stopping log manager: %w", err)) + } + }() // Defer the Stop in order to release all on-going Commit calls in case of error. defer close(mgr.closed) defer mgr.Close() @@ -2008,10 +2013,6 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { continue } - if err := mgr.logManager.PruneLogEntries(mgr.ctx); err != nil { - return fmt.Errorf("pruning log entries: %w", err) - } - if err := mgr.processTransaction(ctx); err != nil { return fmt.Errorf("process transaction: %w", err) } @@ -2039,7 +2040,10 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned return errors.New("cleanup worker failed") case <-mgr.completedQueue: return nil - case <-mgr.logManager.GetNotificationQueue(): + case logErr := <-mgr.logManager.GetNotificationQueue(): + if logErr != nil { + return fmt.Errorf("log manager failed: %w", logErr) + } return nil case <-ctx.Done(): } @@ -2985,7 +2989,7 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende // After this latch block, the transaction is committed and all subsequent transactions // are guaranteed to read it. - appendedLSN, err := mgr.logManager.AppendLogEntry(ctx, logEntryPath) + appendedLSN, err := mgr.logManager.AppendLogEntry(logEntryPath) if err != nil { mgr.mutex.Lock() delete(mgr.snapshotLocks, mgr.logManager.AppendedLSN()+1) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go index 17029b01082b30c139adfb49a8fe8f2e070e59e1..37246625020f12fd91453233f614b13067c28b12 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go @@ -377,20 +377,30 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti "refs/heads/third": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Third.OID}, }, }, - ConsumerAcknowledge{ - LSN: 3, - }, CloseManager{}, StartManager{}, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + // Wait until the first acknowledgement after restart + <-tm.logManager.GetNotificationQueue() + }), }, expectedState: StateAssertion{ Database: DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), }, - Directory: testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - }, + Directory: gittest.FilesOrReftables( + testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + // 1 and 2 were pruned before the manager was closed. + // 3 is not pruned because the consumer hasn't acknowledged it after the + // restart. + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/third", setup.Commits.Third.OID)), + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte(setup.Commits.Third.OID + "\n")}, + }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ + 3: {{"refs/heads/third": git.ReferenceUpdate{NewOID: setup.Commits.Third.OID}}}, + })), Repositories: RepositoryStates{ setup.RelativePath: { DefaultBranch: "refs/heads/main", diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index e5e9cefddfc8c4324daf67ae7d8993bcd7ed5631..45daec4e61455a6ec2e48df53a315c9b61fbc56d 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -2115,14 +2115,6 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t TransactionID: 4, ExpectedError: storage.ErrTransactionProcessingStopped, }, - AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - RequireDatabase(t, ctx, tm.db, DatabaseState{ - string(keyAppliedLSN): storage.LSN(3).ToProto(), - }) - // Transaction 2 and 3 are left-over. - require.NoDirExists(t, tm.logManager.GetEntryPath(2)) - require.NoDirExists(t, tm.logManager.GetEntryPath(3)) - }), StartManager{}, AssertManager{}, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { @@ -2134,6 +2126,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t require.Equal(t, tm.appliedLSN, storage.LSN(3)) require.Equal(t, tm.logManager.AppendedLSN(), storage.LSN(3)) }), + CloseManager{}, }, expectedState: StateAssertion{ Database: DatabaseState{ @@ -2263,19 +2256,22 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t manifestBytes, err := proto.Marshal(expectedManifest.Content.(proto.Message)) require.NoError(t, err) require.NoError(t, os.WriteFile(manifestPath(logEntryPath), manifestBytes, mode.File)) - require.NoError(t, tm.appendLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, nil, logEntryPath)) + + logManager := log.NewManager(tm.storageName, setup.PartitionID, testhelper.TempDir(t), filepath.Join(tm.storagePath, "state"), setup.Consumer) + require.NoError(t, logManager.Initialize(ctx, 3)) + lsn, err := logManager.AppendLogEntry(logEntryPath) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(4)) RequireDatabase(t, ctx, tm.db, DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), }) - // Transaction 2 and 3 are left-over. - require.NoDirExists(t, tm.logManager.GetEntryPath(2)) - require.NoDirExists(t, tm.logManager.GetEntryPath(3)) - testhelper.RequireDirectoryState(t, tm.logManager.GetEntryPath(4), "", testhelper.DirectoryState{ + testhelper.RequireDirectoryState(t, logManager.GetEntryPath(4), "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/MANIFEST": expectedManifest, "/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, }) + require.NoError(t, logManager.Close()) }), StartManager{}, AssertManager{}, @@ -2287,6 +2283,10 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }) require.Equal(t, tm.appliedLSN, storage.LSN(4)) require.Equal(t, tm.logManager.AppendedLSN(), storage.LSN(4)) + + // Transaction 2 and 3 are left-over. + require.NoDirExists(t, tm.logManager.GetEntryPath(2)) + require.NoDirExists(t, tm.logManager.GetEntryPath(3)) }), }, expectedState: StateAssertion{