From 6d0da209b6dab4efd9e88c54b5b9fb7ad964dac9 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 31 Dec 2024 17:41:20 +0700 Subject: [PATCH 1/2] storage: Add CompareAndAppendLogEntry to log manager This commit adds CompareAndAppendLogEntry method to the log manager. This method is a variant of the existing AppendLogEntry. If the input LSN doesn't match the expected LSN, an error is returned. This commit adds the CompareAndAppendLogEntry method to the log manager. This method is a variant of the existing AppendLogEntry. If the input LSN doesn't match the expected LSN; an error is returned. Although the caller can achieve a similar result by checking AppendedLSN() or the returned LSN of AppendLogEntry, this method offers better protection: - Asserting is performed before modification. - The lock is acquired before asserting. This prevents another concurrent writer from adding logs after the LSN is asserted. This new method will be used for components that need to insert log entries at certain LSNs such as backup recovery, Raft manager, etc. --- .../storagemgr/partition/log/log_manager.go | 18 +- .../partition/log/log_manager_test.go | 193 +++++++++++++++++- 2 files changed, 208 insertions(+), 3 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index d66726360dd..58e54d1c10b 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -15,6 +15,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/safe" ) +// ErrLogEntryNotAppended is returned by CompareAndAppendLogEntry if the expected LSN +// doesn't match the latest appended LSN. +var ErrLogEntryNotAppended = errors.New("failed to append log entry: expected LSN does not match the latest appended LSN") + // StatePath returns the WAL directory's path. func StatePath(stateDir string) string { return filepath.Join(stateDir, "wal") @@ -196,17 +200,28 @@ func (mgr *Manager) Close() error { // moves the log entry's directory to the WAL, and returns its LSN once it has // been committed to the log. func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { + return mgr.CompareAndAppendLogEntry(0, logEntryPath) +} + +// CompareAndAppendLogEntry is a variant of AppendLogEntry. It appends the log entry to the write-ahead log if and only +// if the inserting position matches the expected LSN. +func (mgr *Manager) CompareAndAppendLogEntry(nextLSN storage.LSN, logEntryPath string) (storage.LSN, error) { select { case <-mgr.ctx.Done(): return 0, mgr.ctx.Err() default: } - nextLSN := mgr.appendedLSN + 1 if err := func() error { mgr.mutex.Lock() defer mgr.mutex.Unlock() + if nextLSN == 0 { + nextLSN = mgr.appendedLSN + 1 + } else if nextLSN != mgr.appendedLSN+1 { + return ErrLogEntryNotAppended + } + // Move the log entry from the staging directory into its place in the log. destinationPath := mgr.GetEntryPath(nextLSN) if err := os.Rename(logEntryPath, destinationPath); err != nil { @@ -234,6 +249,7 @@ func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { if mgr.consumer != nil { mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) } + return nextLSN, nil } diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 2324a359362..9956367dbb9 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -14,7 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) -func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) storage.LSN { +func setupEntryFiles(t *testing.T, files map[string][]byte) string { t.Helper() logEntryPath := testhelper.TempDir(t) @@ -23,7 +23,11 @@ func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) sto require.NoError(t, os.WriteFile(path, value, mode.File)) } - nextLSN, err := manager.AppendLogEntry(logEntryPath) + return logEntryPath +} + +func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) storage.LSN { + nextLSN, err := manager.AppendLogEntry(setupEntryFiles(t, files)) require.NoError(t, err) return nextLSN @@ -534,6 +538,191 @@ func TestLogManager_AppendLogEntry(t *testing.T) { }) } +func TestLogManager_CompareAndAppendLogEntry(t *testing.T) { + t.Parallel() + + t.Run("compare and append a log entry with a single file", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + lsn, err := logManager.CompareAndAppendLogEntry( + storage.LSN(1), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-1"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(1)) + + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + }) + }) + + t.Run("compare and append a log entry with multiple files", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + lsn, err := logManager.CompareAndAppendLogEntry( + storage.LSN(1), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-1"), + "2": []byte("content-2"), + "3": []byte("content-3"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(1)) + + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000001/2": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000001/3": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + + t.Run("append multiple entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + lsn, err := logManager.CompareAndAppendLogEntry( + storage.LSN(1), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-1"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(1)) + + lsn, err = logManager.CompareAndAppendLogEntry( + storage.LSN(2), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-2-1"), + "2": []byte("content-2-2"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(2)) + + lsn, err = logManager.CompareAndAppendLogEntry( + storage.LSN(3), + setupEntryFiles(t, map[string][]byte{}), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(3)) + + require.Equal(t, logManager.appendedLSN, storage.LSN(3)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2-1")}, + "/wal/0000000000002/2": {Mode: mode.File, Content: []byte("content-2-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + }) + }) + + t.Run("compare and append a log entry at LSN 0", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + + lsn, err := logManager.CompareAndAppendLogEntry( + storage.LSN(0), + setupEntryFiles(t, map[string][]byte{ + "1": []byte("content-2"), + }), + ) + require.NoError(t, err) + require.Equal(t, lsn, storage.LSN(2)) + + require.Equal(t, logManager.appendedLSN, storage.LSN(2)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + for _, invalidCase := range []struct { + desc string + inputLSN storage.LSN + }{ + { + "compare and append a log entry at a LSN < appended LSN", 2, + }, + { + "compare and append a log entry at a LSN == appended LSN", 3, + }, + // Only appending at LSN 4 is successful. + { + "compare and append a log entry at a LSN > appended LSN + 1", 5, + }, + } { + t.Run(invalidCase.desc, func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) + + require.Equal(t, logManager.appendedLSN, storage.LSN(3)) + + _, err := logManager.CompareAndAppendLogEntry( + invalidCase.inputLSN, + setupEntryFiles(t, map[string][]byte{ + "1": []byte("should-not-append"), + }), + ) + require.ErrorIs(t, err, ErrLogEntryNotAppended) + + require.Equal(t, logManager.appendedLSN, storage.LSN(3)) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + } +} + type mockLogConsumer struct { mu sync.Mutex positions [][]storage.LSN -- GitLab From baa50b1b69689c1351e7a6b2842ed3f5c3d4dae8 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 26 Dec 2024 11:21:41 +0700 Subject: [PATCH 2/2] storage: Expose some more methods of log.Manager This commit exposes some more methods of `log.Manager` via `storage.LogReader` and `storage.LogManager` interfaces. Those methods will be used backup recovery and Raft manager in some next commits, including: - LowWaterMark(): The raft manager needs to access the low water mark as a part of etcd/raft integration. The library needs to query the boundary of logs under the management of the application. - DeleteLogEntry(): When Raft inserts a log entry, there's a chance that the log entry comes from another node with a higher term. In that case, the Raft manager needs to remove all entries having lower terms. --- internal/gitaly/storage/storage.go | 13 +++++-- .../storagemgr/partition/log/log_manager.go | 14 ++++---- .../partition/log/log_manager_test.go | 34 +++++++++---------- 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index e773931a7d9..cf08719c91c 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -172,6 +172,13 @@ type LogReader interface { // AcknowledgePosition acknowledges log entries up and including lsn as successfully processed // for the specified position type. AcknowledgePosition(PositionType, LSN) error + + // AppendedLSN returns the LSN of the latest appended log entry. + AppendedLSN() LSN + + // LowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than + // this mark are removed. + LowWaterMark() LSN } // LogWriter adds entries to the Write-Ahead Log. @@ -179,6 +186,9 @@ type LogWriter interface { // AppendLogEntry appends an entry to the WAL. logEntryPath specifies the directory of the log entry. It returns // the Log Sequence Number (LSN) of the appended log entry. AppendLogEntry(logEntryPath string) (LSN, error) + + // DeleteLogEntry deletes the log entry at the given LSN from the log. + DeleteLogEntry(lsn LSN) error } // LogManager is the interface used to manage the underlying Write-Ahead Log entries. @@ -194,9 +204,6 @@ type LogManager interface { // is blocked until complete. Close() error - // AppendedLSN returns the LSN of the latest appended log entry. - AppendedLSN() LSN - // GetNotificationQueue returns a channel that is used to notify external components of changes. GetNotificationQueue() <-chan error } diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index 58e54d1c10b..95ebf2e3949 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -247,7 +247,7 @@ func (mgr *Manager) CompareAndAppendLogEntry(nextLSN storage.LSN, logEntryPath s } if mgr.consumer != nil { - mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) + mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.LowWaterMark(), nextLSN) } return nextLSN, nil @@ -336,8 +336,8 @@ func (mgr *Manager) pruneLogEntries() { // │ // Low-water mark // - for mgr.oldestLSN < mgr.lowWaterMark() { - if err := mgr.deleteLogEntry(mgr.oldestLSN); err != nil { + for mgr.oldestLSN < mgr.LowWaterMark() { + if err := mgr.DeleteLogEntry(mgr.oldestLSN); err != nil { err = fmt.Errorf("deleting log entry: %w", err) mgr.notifyQueue <- err return @@ -360,8 +360,8 @@ func (mgr *Manager) GetEntryPath(lsn storage.LSN) string { return EntryPath(mgr.stateDirectory, lsn) } -// deleteLogEntry deletes the log entry at the given LSN from the log. -func (mgr *Manager) deleteLogEntry(lsn storage.LSN) error { +// DeleteLogEntry deletes the log entry at the given LSN from the log. +func (mgr *Manager) DeleteLogEntry(lsn storage.LSN) error { defer trace.StartRegion(mgr.ctx, "deleteLogEntry").End() tmpDir, err := os.MkdirTemp(mgr.tmpDirectory, "") @@ -394,9 +394,9 @@ func (mgr *Manager) deleteLogEntry(lsn storage.LSN) error { return nil } -// lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than +// LowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than // this mark are removed. -func (mgr *Manager) lowWaterMark() storage.LSN { +func (mgr *Manager) LowWaterMark() storage.LSN { mgr.mutex.Lock() defer mgr.mutex.Unlock() diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 9956367dbb9..5ad660848c2 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -74,7 +74,7 @@ func TestLogManager_Initialize(t *testing.T) { waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(1), logManager.oldestLSN) require.Equal(t, storage.LSN(0), logManager.appendedLSN) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -102,7 +102,7 @@ func TestLogManager_Initialize(t *testing.T) { waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(1), logManager.oldestLSN) require.Equal(t, storage.LSN(2), logManager.appendedLSN) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -138,7 +138,7 @@ func TestLogManager_Initialize(t *testing.T) { waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(3), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) - require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(3), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -170,7 +170,7 @@ func TestLogManager_Initialize(t *testing.T) { waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(4), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) - require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(4), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -748,7 +748,7 @@ func TestLogManager_Positions(t *testing.T) { logManager := setupLogManager(t, ctx, mockConsumer) require.Equal(t, [][]storage.LSN(nil), mockConsumer.positions) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -771,7 +771,7 @@ func TestLogManager_Positions(t *testing.T) { // Apply to 3 but consume to 1 simulatePositions(t, logManager, 1, 2) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(2), logManager.LowWaterMark()) // Inject 3, 4 appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) @@ -802,7 +802,7 @@ func TestLogManager_Positions(t *testing.T) { // All log entries are pruned at this point. The consumer should not be notified again. require.Equal(t, [][]storage.LSN{{2, 4}}, mockConsumer.positions) - require.Equal(t, storage.LSN(5), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(5), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -819,7 +819,7 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions(t, logManager, 0, 2) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -841,7 +841,7 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions(t, logManager, 1, 2) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(2), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -861,7 +861,7 @@ func TestLogManager_Positions(t *testing.T) { simulatePositions(t, logManager, 2, 0) require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -892,7 +892,7 @@ func TestLogManager_Positions(t *testing.T) { // The oldest LSN changes after each acknowledgement require.Equal(t, [][]storage.LSN{{1, 1}, {2, 2}}, mockConsumer.positions) - require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(3), logManager.LowWaterMark()) }) t.Run("append while consumer is busy with prior entries", func(t *testing.T) { @@ -908,7 +908,7 @@ func TestLogManager_Positions(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) simulatePositions(t, logManager, 3, 3) - require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(4), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -927,7 +927,7 @@ func TestLogManager_Positions(t *testing.T) { // 2 and 3 are not applied, hence kept intact. simulatePositions(t, logManager, 3, 1) - require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(2), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -939,7 +939,7 @@ func TestLogManager_Positions(t *testing.T) { }) simulatePositions(t, logManager, 3, 3) - require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(4), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, @@ -967,7 +967,7 @@ func TestLogManager_Positions(t *testing.T) { // Consumed = 3, Applied = 2, TestPosition1 = 1, testPosition2 = 1 simulatePositions(t, logManager, 3, 2) - require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(1), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -984,7 +984,7 @@ func TestLogManager_Positions(t *testing.T) { require.NoError(t, logManager.AcknowledgePosition(t2, 2)) simulatePositions(t, logManager, 3, 3) - require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(3), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -997,7 +997,7 @@ func TestLogManager_Positions(t *testing.T) { require.NoError(t, logManager.AcknowledgePosition(t2, 3)) simulatePositions(t, logManager, 3, 3) - require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + require.Equal(t, storage.LSN(4), logManager.LowWaterMark()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, -- GitLab