diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index e773931a7d9cf5fc9fe8308efd0b20e467035d5b..cf08719c91c91d49f3a0f2b9b88e9a8cd3fe816b 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -172,6 +172,13 @@ type LogReader interface { // AcknowledgePosition acknowledges log entries up and including lsn as successfully processed // for the specified position type. AcknowledgePosition(PositionType, LSN) error + + // AppendedLSN returns the LSN of the latest appended log entry. + AppendedLSN() LSN + + // LowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than + // this mark are removed. + LowWaterMark() LSN } // LogWriter adds entries to the Write-Ahead Log. @@ -179,6 +186,9 @@ type LogWriter interface { // AppendLogEntry appends an entry to the WAL. logEntryPath specifies the directory of the log entry. It returns // the Log Sequence Number (LSN) of the appended log entry. AppendLogEntry(logEntryPath string) (LSN, error) + + // DeleteLogEntry deletes the log entry at the given LSN from the log. + DeleteLogEntry(lsn LSN) error } // LogManager is the interface used to manage the underlying Write-Ahead Log entries. @@ -194,9 +204,6 @@ type LogManager interface { // is blocked until complete. Close() error - // AppendedLSN returns the LSN of the latest appended log entry. - AppendedLSN() LSN - // GetNotificationQueue returns a channel that is used to notify external components of changes. GetNotificationQueue() <-chan error } diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index d66726360dd451dc61eb216e97e82ee62a9ba72b..95ebf2e3949bbb3b381f54110da617e5113f0d4a 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -15,6 +15,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/safe" ) +// ErrLogEntryNotAppended is returned by CompareAndAppendLogEntry if the expected LSN +// doesn't match the latest appended LSN. +var ErrLogEntryNotAppended = errors.New("failed to append log entry: expected LSN does not match the latest appended LSN") + // StatePath returns the WAL directory's path. func StatePath(stateDir string) string { return filepath.Join(stateDir, "wal") @@ -196,17 +200,28 @@ func (mgr *Manager) Close() error { // moves the log entry's directory to the WAL, and returns its LSN once it has // been committed to the log. func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { + return mgr.CompareAndAppendLogEntry(0, logEntryPath) +} + +// CompareAndAppendLogEntry is a variant of AppendLogEntry. It appends the log entry to the write-ahead log if and only +// if the inserting position matches the expected LSN. +func (mgr *Manager) CompareAndAppendLogEntry(nextLSN storage.LSN, logEntryPath string) (storage.LSN, error) { select { case <-mgr.ctx.Done(): return 0, mgr.ctx.Err() default: } - nextLSN := mgr.appendedLSN + 1 if err := func() error { mgr.mutex.Lock() defer mgr.mutex.Unlock() + if nextLSN == 0 { + nextLSN = mgr.appendedLSN + 1 + } else if nextLSN != mgr.appendedLSN+1 { + return ErrLogEntryNotAppended + } + // Move the log entry from the staging directory into its place in the log. destinationPath := mgr.GetEntryPath(nextLSN) if err := os.Rename(logEntryPath, destinationPath); err != nil { @@ -232,8 +247,9 @@ func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { } if mgr.consumer != nil { - mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) + mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.LowWaterMark(), nextLSN) } + return nextLSN, nil } @@ -320,8 +336,8 @@ func (mgr *Manager) pruneLogEntries() { // │ // Low-water mark // - for mgr.oldestLSN < mgr.lowWaterMark() { - if err := mgr.deleteLogEntry(mgr.oldestLSN); err != nil { + for mgr.oldestLSN < mgr.LowWaterMark() { + if err := mgr.DeleteLogEntry(mgr.oldestLSN); err != nil { err = fmt.Errorf("deleting log entry: %w", err) mgr.notifyQueue <- err return @@ -344,8 +360,8 @@ func (mgr *Manager) GetEntryPath(lsn storage.LSN) string { return EntryPath(mgr.stateDirectory, lsn) } -// deleteLogEntry deletes the log entry at the given LSN from the log. -func (mgr *Manager) deleteLogEntry(lsn storage.LSN) error { +// DeleteLogEntry deletes the log entry at the given LSN from the log. +func (mgr *Manager) DeleteLogEntry(lsn storage.LSN) error { defer trace.StartRegion(mgr.ctx, "deleteLogEntry").End() tmpDir, err := os.MkdirTemp(mgr.tmpDirectory, "") @@ -378,9 +394,9 @@ func (mgr *Manager) deleteLogEntry(lsn storage.LSN) error { return nil } -// lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than +// LowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than // this mark are removed. -func (mgr *Manager) lowWaterMark() storage.LSN { +func (mgr *Manager) LowWaterMark() storage.LSN { mgr.mutex.Lock() defer mgr.mutex.Unlock() diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 2324a35936259bbc3cb9877bc354ef8e468407ff..5ad660848c2735b9e84b7b76a65ac93148ee886d 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -14,7 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) -func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) storage.LSN { +func setupEntryFiles(t *testing.T, files map[string][]byte) string { t.Helper() logEntryPath := testhelper.TempDir(t) @@ -23,7 +23,11 @@ func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) sto require.NoError(t, os.WriteFile(path, value, mode.File)) } - nextLSN, err := manager.AppendLogEntry(logEntryPath) + return logEntryPath +} + +func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) storage.LSN { + nextLSN, err := manager.AppendLogEntry(setupEntryFiles(t, files)) require.NoError(t, err) return nextLSN @@ -70,7 +74,7 @@ func TestLogManager_Initialize(t *testing.T) { waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(1), logManager.oldestLSN) require.Equal(t, storage.LSN(0), logManager.appendedLSN) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -98,7 +102,7 @@ func TestLogManager_Initialize(t *testing.T) { waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(1), logManager.oldestLSN) require.Equal(t, storage.LSN(2), logManager.appendedLSN) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -134,7 +138,7 @@ func TestLogManager_Initialize(t *testing.T) { waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(3), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) - require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(3), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -166,7 +170,7 @@ func TestLogManager_Initialize(t *testing.T) { waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(4), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) - require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(4), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -534,6 +538,191 @@ func TestLogManager_AppendLogEntry(t *testing.T) { }) } +func TestLogManager_CompareAndAppendLogEntry(t *testing.T) { + t.Parallel() + + t.Run("compare and append a log entry with a single file", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + lsn, err := logManager.CompareAndAppendLogEntry( + storage.LSN(1), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-1"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(1)) + + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + }) + }) + + t.Run("compare and append a log entry with multiple files", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + lsn, err := logManager.CompareAndAppendLogEntry( + storage.LSN(1), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-1"), + "2": []byte("content-2"), + "3": []byte("content-3"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(1)) + + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000001/2": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000001/3": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + + t.Run("append multiple entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + lsn, err := logManager.CompareAndAppendLogEntry( + storage.LSN(1), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-1"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(1)) + + lsn, err = logManager.CompareAndAppendLogEntry( + storage.LSN(2), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-2-1"), + "2": []byte("content-2-2"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(2)) + + lsn, err = logManager.CompareAndAppendLogEntry( + storage.LSN(3), + setupEntryFiles(t, map[string][]byte{}), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(3)) + + require.Equal(t, logManager.appendedLSN, storage.LSN(3)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2-1")}, + "/wal/0000000000002/2": {Mode: mode.File, Content: []byte("content-2-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + }) + }) + + t.Run("compare and append a log entry at LSN 0", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + + lsn, err := logManager.CompareAndAppendLogEntry( + storage.LSN(0), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-2"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(2)) + + require.Equal(t, logManager.appendedLSN, storage.LSN(2)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + for _, invalidCase := range []struct { + desc string + inputLSN storage.LSN + }{ + { + "compare and append a log entry at a LSN < appended LSN", 2, + }, + { + "compare and append a log entry at a LSN == appended LSN", 3, + }, + // Only appending at LSN 4 is successful. + { + "compare and append a log entry at a LSN > appended LSN + 1", 5, + }, + } { + t.Run(invalidCase.desc, func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) + + require.Equal(t, logManager.appendedLSN, storage.LSN(3)) + + _, err := logManager.CompareAndAppendLogEntry( + invalidCase.inputLSN, + setupEntryFiles(t, map[string][]byte{ + "1": []byte("should-not-append"), + }), + ) + require.ErrorIs(t, err, ErrLogEntryNotAppended) + + require.Equal(t, logManager.appendedLSN, storage.LSN(3)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + } +} + type mockLogConsumer struct { mu sync.Mutex positions [][]storage.LSN @@ -559,7 +748,7 @@ func TestLogManager_Positions(t *testing.T) { logManager := setupLogManager(t, ctx, mockConsumer) require.Equal(t, [][]storage.LSN(nil), mockConsumer.positions) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -582,7 +771,7 @@ func TestLogManager_Positions(t *testing.T) { // Apply to 3 but consume to 1 simulatePositions(t, logManager, 1, 2) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(2), logManager.LowWaterMark()) // Inject 3, 4 appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) @@ -613,7 +802,7 @@ func TestLogManager_Positions(t *testing.T) { // All log entries are pruned at this point. The consumer should not be notified again. require.Equal(t, [][]storage.LSN{{2, 4}}, mockConsumer.positions) - require.Equal(t, storage.LSN(5), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(5), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -630,7 +819,7 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions(t, logManager, 0, 2) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -652,7 +841,7 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions(t, logManager, 1, 2) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(2), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -672,7 +861,7 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions(t, logManager, 2, 0) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -703,7 +892,7 @@ func TestLogManager_Positions(t *testing.T) { // The oldest LSN changes after each acknowledgement require.Equal(t, [][]storage.LSN{{1, 1}, {2, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(3), logManager.LowWaterMark()) }) t.Run("append while consumer is busy with prior entries", func(t *testing.T) { @@ -719,7 +908,7 @@ func TestLogManager_Positions(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) simulatePositions(t, logManager, 3, 3) - require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(4), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -738,7 +927,7 @@ func TestLogManager_Positions(t *testing.T) { // 2 and 3 are not applied, hence kept intact. simulatePositions(t, logManager, 3, 1) - require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(2), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -750,7 +939,7 @@ func TestLogManager_Positions(t *testing.T) { }) simulatePositions(t, logManager, 3, 3) - require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(4), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -778,7 +967,7 @@ func TestLogManager_Positions(t *testing.T) { // Consumed = 3, Applied = 2, TestPosition1 = 1, testPosition2 = 1 simulatePositions(t, logManager, 3, 2) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -795,7 +984,7 @@ func TestLogManager_Positions(t *testing.T) { require.NoError(t, logManager.AcknowledgePosition(t2, 2)) simulatePositions(t, logManager, 3, 3) - require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(3), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -808,7 +997,7 @@ func TestLogManager_Positions(t *testing.T) { require.NoError(t, logManager.AcknowledgePosition(t2, 3)) simulatePositions(t, logManager, 3, 3) - require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(4), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory},