From 84262870ae93de5698f93ff71b509d6792297846 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 21 Nov 2024 16:36:46 +0700 Subject: [PATCH 01/15] storage: Remove redundant beforeDeleteLogEntryFiles hook This commit removes beforeDeleteLogEntryFiles test hookk. It is not used by any tests. --- .../storagemgr/partition/testhelper_test.go | 2 -- .../partition/transaction_manager.go | 26 +++++++++---------- .../transaction_manager_hook_test.go | 9 +++---- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index e7ef57edc7d..42eaab03a63 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -721,8 +721,6 @@ type testTransactionHooks struct { BeforeApplyLogEntry hookFunc // BeforeAppendLogEntry is called before a log entry is appended to the log. BeforeAppendLogEntry hookFunc - // AfterDeleteLogEntry is called after a log entry is deleted. - AfterDeleteLogEntry hookFunc // BeforeReadAppliedLSN is invoked before the applied LSN is read. BeforeReadAppliedLSN hookFunc // BeforeStoreAppliedLSN is invoked before the applied LSN is stored. diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 6d3d799007c..ee62a636ccb 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -1065,12 +1065,11 @@ type TransactionManager struct { } type testHooks struct { - beforeInitialization func() - beforeAppendLogEntry func() - beforeApplyLogEntry func() - beforeStoreAppliedLSN func() - beforeDeleteLogEntryFiles func() - beforeRunExiting func() + beforeInitialization func() + beforeAppendLogEntry func() + beforeApplyLogEntry func() + beforeStoreAppliedLSN func() + beforeRunExiting func() } // NewTransactionManager returns a new TransactionManager for the given repository. @@ -1123,12 +1122,11 @@ func NewTransactionManager( acknowledgedQueue: make(chan struct{}, 1), testHooks: testHooks{ - beforeInitialization: func() {}, - beforeAppendLogEntry: func() {}, - beforeApplyLogEntry: func() {}, - beforeStoreAppliedLSN: func() {}, - beforeDeleteLogEntryFiles: func() {}, - beforeRunExiting: func() {}, + beforeInitialization: func() {}, + beforeAppendLogEntry: func() {}, + beforeApplyLogEntry: func() {}, + beforeStoreAppliedLSN: func() {}, + beforeRunExiting: func() {}, }, } } @@ -1794,7 +1792,7 @@ func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, tran Name: "pack-refs", // By using the '--auto' flag, we ensure that git uses the best heuristic // for compaction. For reftables, it currently uses a geometric progression. - // This ensures we don't keep compacting unecessarily to a single file. + // This ensures we don't keep compacting unnecessarily to a single file. Flags: []gitcmd.Option{gitcmd.Flag{Name: "--auto"}}, }, gitcmd.WithStderr(&stderr)); err != nil { return structerr.New("exec pack-refs: %w", err).WithMetadata("stderr", stderr.String()) @@ -2941,7 +2939,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // to transaction operations. // // To ensure that we don't modify existing tables and autocompact, we lock the existing tables -// before applying the updates. This way the reftable backend willl only create new tables +// before applying the updates. This way the reftable backend will only create new tables func (mgr *TransactionManager) verifyReferencesWithGitForReftables( ctx context.Context, referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index 1b35818daee..e12f69d0897 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -25,11 +25,10 @@ type hookContext struct { // installHooks takes the hooks in the test setup and configures them in the TransactionManager. func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, hooks testTransactionHooks) { for destination, source := range map[*func()]hookFunc{ - &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, - &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, - &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, - &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, - &mgr.testHooks.beforeDeleteLogEntryFiles: hooks.AfterDeleteLogEntry, + &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, + &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, + &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, + &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, &mgr.testHooks.beforeRunExiting: func(hookContext) { if hooks.WaitForTransactionsWhenClosing { inflightTransactions.Wait() -- GitLab From 2f0880982cf2533ab99455950ed96c8a80745d23 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 18 Nov 2024 10:00:00 +0700 Subject: [PATCH 02/15] storage: Store referenced log entry in memory TransactionManager keeps a list of committed but still referenced by other transactions. At verification step, TransactionManager walks through the list, load respective log entries back to the memory. This loading step is redundant and repetitive. A committed entry is possibly referenced by multiple transactions. When the TransactionManager iterates through the list, an entry is loaded and unmarshaled multiple times, leading to extra memory occupation. The life span of an item of the list is likely to be short-term. Thus, it makes sense to keep the log entry in memory. --- .../storagemgr/partition/transaction_manager.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index ee62a636ccb..bda143832d5 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -876,6 +876,8 @@ type snapshotLock struct { // committedEntry is a wrapper for a log entry. It is used to keep track of entries in which their snapshots are still // accessed by other transactions. type committedEntry struct { + // entry is the in-memory reflection of referenced log entry. + entry *gitalypb.LogEntry // lsn is the associated LSN of the entry lsn storage.LSN // snapshotReaders accounts for the number of transaction readers of the snapshot. @@ -3668,6 +3670,7 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})} mgr.committedEntries.PushBack(&committedEntry{ lsn: nextLSN, + entry: logEntry, objectDependencies: objectDependencies, }) mgr.mutex.Unlock() @@ -3904,17 +3907,17 @@ func (mgr *TransactionManager) walkCommittedEntries(transaction *Transaction, ca if committed.lsn <= transaction.snapshotLSN { continue } - entry, err := mgr.readLogEntry(committed.lsn) - if err != nil { + + if committed.entry == nil { return errCommittedEntryGone } // Transaction manager works on the partition level, including a repository and all of its pool // member repositories (if any). We need to filter log entries of the repository this // transaction targets. - if entry.GetRelativePath() != transaction.relativePath { + if committed.entry.GetRelativePath() != transaction.relativePath { continue } - if err := callback(entry, committed.objectDependencies); err != nil { + if err := callback(committed.entry, committed.objectDependencies); err != nil { return fmt.Errorf("callback: %w", err) } } -- GitLab From b84635e29066eef0aaea4e94e47ed0930ccd1fc8 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 21 Nov 2024 17:52:26 +0700 Subject: [PATCH 03/15] storage: Allow pruning log entries used by snapshots We used to take the head of committed entries list into account when determining which entries to be pruned. In a prior commit, we keep referenced log entry state in the memory. Thus, there's no need to prevent log from being pruned even if there are transactions with older read snapshots around. Recently, the TransactionManager has utilized a low watermark to limit which log entries should be pruned. This low watermark takes the head of committedEntries into account. Now, a log entry is pruned while it's still referenced by other transactions. Thus, we cannot use the low watermark anymore. The conflict manager now depends solely on the committed list to evict an LSN. Unfortunately, `cleanCommittedEntry` is called by some other goroutines. Thus, we need to use mutex to prevent concurrent calls to `Prepare()` and `Evict()`. This commit also set the initial consumer position to oldestLSN instead of zero. --- .../partition/transaction_manager.go | 33 ++++++++--------- .../transaction_manager_consumer_test.go | 36 ++++++------------- .../partition/transaction_manager_test.go | 34 +++++------------- 3 files changed, 34 insertions(+), 69 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index bda143832d5..6e676f57262 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -2296,14 +2296,6 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { if err := mgr.deleteLogEntry(ctx, mgr.oldestLSN); err != nil { return fmt.Errorf("deleting log entry: %w", err) } - - // The WAL entries are deleted only after there are no transactions using an - // older read snapshot than the LSN. It's also safe to drop the transaction - // from the conflict detection history as there are no transactions reading - // at an older snapshot. Since the changes are already in the transaction's - // snapshot, it would already base its changes on them. - mgr.conflictMgr.EvictLSN(ctx, mgr.oldestLSN) - mgr.oldestLSN++ continue } @@ -2394,6 +2386,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned // Prepare the transaction to conflict check it. We'll commit it later if we // succeed logging the transaction. + mgr.mutex.Lock() preparedTX, err := mgr.conflictMgr.Prepare(ctx, &conflict.Transaction{ ReadLSN: transaction.SnapshotLSN(), TargetRelativePath: transaction.relativePath, @@ -2401,6 +2394,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned ZeroOID: zeroOID, ReferenceUpdates: transaction.referenceUpdates, }) + mgr.mutex.Unlock() if err != nil { return fmt.Errorf("prepare: %w", err) } @@ -2461,7 +2455,9 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned } // Commit the prepared transaction now that we've managed to commit the log entry. + mgr.mutex.Lock() preparedTX.Commit(ctx, mgr.appendedLSN) + mgr.mutex.Unlock() return nil }(); err != nil { @@ -2643,6 +2639,9 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { } if mgr.consumer != nil { + // Set acknowledged position to oldestLSN - 1 and notify the consumer from oldestLSN -> appendedLSN. + // If set the position to 0, the consumer is unable to read pruned entry anyway. + mgr.consumerPos.setPosition(mgr.oldestLSN - 1) mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) } @@ -3865,17 +3864,7 @@ func (mgr *TransactionManager) lowWaterMark() storage.LSN { } } - elm := mgr.committedEntries.Front() - if elm == nil { - return minConsumed - } - - committed := elm.Value.(*committedEntry).lsn - if minConsumed < committed { - return minConsumed - } - - return committed + return minConsumed } // updateCommittedEntry updates the reader counter of the committed entry of the snapshot that this transaction depends on. @@ -3941,6 +3930,12 @@ func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) bool { } mgr.committedEntries.Remove(elm) + + // It's safe to drop the transaction from the conflict detection history as there are no transactions + // reading at an older snapshot. Since the changes are already in the transaction's snapshot, it would + // already base its changes on them. + mgr.conflictMgr.EvictLSN(mgr.ctx, front.lsn) + removedAnyEntry = true elm = mgr.committedEntries.Front() } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go index 0f3004e3a39..2b5cbf375e4 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go @@ -263,7 +263,7 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti }, }, { - desc: "dependent transaction blocks pruning acknowledged entry", + desc: "dependent transaction does not block pruning acknowledged entry", customSetup: customSetup, steps: steps{ StartManager{}, @@ -295,15 +295,10 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti Database: DatabaseState{ string(keyAppliedLSN): storage.LSN(1).ToProto(), }, - Directory: gittest.FilesOrReftables(testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - "/wal/0000000000001": {Mode: mode.Directory}, - "/wal/0000000000001/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/main", setup.Commits.First.OID)), - "/wal/0000000000001/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, - }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ - 1: {{"refs/heads/main": git.ReferenceUpdate{NewOID: setup.Commits.First.OID}}}, - })), + Directory: testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }, Repositories: RepositoryStates{ setup.RelativePath: { DefaultBranch: "refs/heads/main", @@ -351,7 +346,7 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti }, }, { - desc: "consumer position zeroed lsn on restart", + desc: "consumer position set to oldest lsn on restart", customSetup: customSetup, steps: steps{ StartManager{}, @@ -408,19 +403,10 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti Database: DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), }, - Directory: gittest.FilesOrReftables(testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/other", setup.Commits.Second.OID)), - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte(setup.Commits.Second.OID + "\n")}, - "/wal/0000000000003": {Mode: mode.Directory}, - "/wal/0000000000003/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/third", setup.Commits.Third.OID)), - "/wal/0000000000003/1": {Mode: mode.File, Content: []byte(setup.Commits.Third.OID + "\n")}, - }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ - 2: {{"refs/heads/other": git.ReferenceUpdate{NewOID: setup.Commits.Second.OID}}}, - 3: {{"refs/heads/third": git.ReferenceUpdate{NewOID: setup.Commits.Third.OID}}}, - })), + Directory: testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }, Repositories: RepositoryStates{ setup.RelativePath: { DefaultBranch: "refs/heads/main", @@ -484,7 +470,7 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti }, }, Consumers: ConsumerState{ - ManagerPosition: 0, + ManagerPosition: 3, HighWaterMark: 3, }, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index edbe3e72654..f10a5248e5a 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -1723,15 +1723,12 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t require.Equal(t, expected[i].snapshotReaders, actual.snapshotReaders) if expected[i].entry != nil { - actualEntry, err := manager.readLogEntry(actual.lsn) - require.NoError(t, err) - expectedEntry := expected[i].entry if testhelper.IsReftableEnabled() { for idx, op := range expectedEntry.GetOperations() { if chl := op.GetCreateHardLink(); chl != nil { - actualCHL := actualEntry.GetOperations()[idx].GetCreateHardLink() + actualCHL := actual.entry.GetOperations()[idx].GetCreateHardLink() require.NotNil(t, actualCHL) if filepath.Base(string(actualCHL.GetDestinationPath())) == "tables.list" { @@ -1746,7 +1743,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t } } - testhelper.ProtoEqual(t, expectedEntry, actualEntry) + testhelper.ProtoEqual(t, expectedEntry, actual.entry) } i++ } @@ -2117,19 +2114,11 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }) // Transaction 2 and 3 are left-over. testhelper.RequireDirectoryState(t, tm.stateDirectory, "", - gittest.FilesOrReftables(testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-1", setup.Commits.First.OID)), - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, - "/wal/0000000000003": {Mode: mode.Directory}, - "/wal/0000000000003/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-2", setup.Commits.First.OID)), - "/wal/0000000000003/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, - }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ - 2: {{"refs/heads/branch-1": git.ReferenceUpdate{NewOID: setup.Commits.First.OID}}}, - 3: {{"refs/heads/branch-2": git.ReferenceUpdate{NewOID: setup.Commits.First.OID}}}, - }))) + testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }, + ) }), StartManager{}, AssertManager{}, @@ -2267,7 +2256,8 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t logEntryPath := filepath.Join(t.TempDir(), "log_entry") require.NoError(t, os.Mkdir(logEntryPath, mode.Directory)) require.NoError(t, os.WriteFile(filepath.Join(logEntryPath, "1"), []byte(setup.Commits.First.OID+"\n"), mode.File)) - require.NoError(t, tm.appendLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath)) + err := tm.appendLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath) + require.NoError(t, err) RequireDatabase(t, ctx, tm.db, DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), @@ -2276,12 +2266,6 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t testhelper.RequireDirectoryState(t, tm.stateDirectory, "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-1", setup.Commits.First.OID)), - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, - "/wal/0000000000003": {Mode: mode.Directory}, - "/wal/0000000000003/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-2", setup.Commits.First.OID)), - "/wal/0000000000003/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, "/wal/0000000000004": {Mode: mode.Directory}, "/wal/0000000000004/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID)), "/wal/0000000000004/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, -- GitLab From c8591b9a5a7e614b4365cab8a8ba7c2aad5c4595 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 21 Nov 2024 16:58:14 +0700 Subject: [PATCH 04/15] storage: Generalize consumerPos struct Recently, TransactionManager has an internal structure to track the latest LSN, which a consumer has acknowledged. The struct is named `consumerPos`. This commit renames it to a more generic name. In later commits, it will be used for something else. --- .../storagemgr/partition/testhelper_test.go | 6 +++- .../partition/transaction_manager.go | 31 +++++++++---------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 42eaab03a63..dbafd843c14 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -1008,9 +1008,13 @@ type ConsumerState struct { } // RequireConsumer asserts the consumer log position is correct. -func RequireConsumer(t *testing.T, consumer LogConsumer, consumerPos *consumerPosition, expected ConsumerState) { +func RequireConsumer(t *testing.T, consumer LogConsumer, consumerPos *position, expected ConsumerState) { t.Helper() + if consumerPos == nil { + return + } + require.Equal(t, expected.ManagerPosition, consumerPos.getPosition(), "expected and actual manager position don't match") if consumer == nil { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 6e676f57262..4986f1d0e29 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -18,6 +18,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -904,25 +905,23 @@ func (mgr *TransactionManager) GetTransactionPath(lsn storage.LSN) string { return walFilesPathForLSN(mgr.stateDirectory, lsn) } -// consumerPosition tracks the last LSN acknowledged for a consumer. -type consumerPosition struct { - // position is the last LSN acknowledged as completed by the consumer. - position storage.LSN - sync.Mutex +// position tracks the last LSN acknowledged for of a particular type. +type position struct { + lsn atomic.Value } -func (p *consumerPosition) getPosition() storage.LSN { - p.Lock() - defer p.Unlock() - - return p.position +func newPosition() *position { + p := position{} + p.setPosition(0) + return &p } -func (p *consumerPosition) setPosition(pos storage.LSN) { - p.Lock() - defer p.Unlock() +func (p *position) getPosition() storage.LSN { + return p.lsn.Load().(storage.LSN) +} - p.position = pos +func (p *position) setPosition(pos storage.LSN) { + p.lsn.Store(pos) } // TransactionManager is responsible for transaction management of a single repository. Each repository has @@ -1054,7 +1053,7 @@ type TransactionManager struct { // log entries. Log entries are retained until the consumer has acknowledged past their LSN. consumer LogConsumer // consumerPos tracks the largest LSN that has been acknowledged by consumer. - consumerPos *consumerPosition + consumerPos *position // acknowledgedQueue is a queue notifying when a transaction has been acknowledged. acknowledgedQueue chan struct{} @@ -1090,7 +1089,7 @@ func NewTransactionManager( ) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) - consumerPos := &consumerPosition{} + consumerPos := newPosition() cleanupWorkers := &errgroup.Group{} cleanupWorkers.SetLimit(25) -- GitLab From a2ed13262f33c0698c0298bb4815cdf51cde9ba0 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 21 Nov 2024 17:06:58 +0700 Subject: [PATCH 05/15] storage: Improve ASCII flow charts In TransactionManager's code, there are some flow charts that demonstrate the position of indices. Those charts use some characters which are rendered differently between platforms. This commit replaces them with fixed-length character ones. --- .../partition/transaction_manager.go | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 4986f1d0e29..feb898984a2 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -371,9 +371,12 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti // entries. We signal only if the transaction modifies the in-memory committed entry. // This signal queue is buffered. If the queue is full, the manager hasn't woken up. The // next scan will cover the work of the prior one. So, no need to let the transaction wait. - // ┌─ 1st signal ┌─ The manager scans til here - // □ □ □ □ □ □ □ □ □ □ ■ ■ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ⧅ ⧅ ⧅ ⧅ ■ - // └─ 2nd signal + // + // ┌─ 1st signal ┌── The manager scans til here + // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ + // └─┘ └─┘ └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ + // └─ 2nd signal + // if removedAnyEntry { select { case mgr.completedQueue <- struct{}{}: @@ -2287,10 +2290,17 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { // it. This condition is to prevent a "hole" in the list. A transaction referring to a log entry at the // low-water mark might scan all afterward log entries. Thus, the manager needs to keep in the database. // - // ┌─ Oldest LSN - // ┌─ Can be removed ─┐ ┌─ Cannot be removed - // □ □ □ □ □ □ □ □ □ □ ■ ■ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ⧅ ⧅ ⧅ ⧅ ■ - // └─ Low-water mark, still referred by another transaction + // + // ┌── Consumer not acknowledged + // │ ┌─ Applied til this point + // Can remove │ │ ┌─ Free, but cannot be removed + // ◄───────────► │ │ │ + // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ + // └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ + // └─ oldestLSN ▲ + // │ + // Low-water mark + // if mgr.oldestLSN < mgr.lowWaterMark() { if err := mgr.deleteLogEntry(ctx, mgr.oldestLSN); err != nil { return fmt.Errorf("deleting log entry: %w", err) @@ -2607,16 +2617,6 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return fmt.Errorf("new snapshot manager: %w", err) } - // The LSN of the last appended log entry is determined from the LSN of the latest entry in the log and - // the latest applied log entry. The manager also keeps track of committed entries and reserves them until there - // is no transaction refers them. It's possible there are some left-over entries in the database because a - // transaction can hold the entry stubbornly. So, the manager could not clean them up in the last session. - // - // ┌─ oldestLSN ┌─ appendedLSN - // ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ - // └─ appliedLSN - // - // // oldestLSN is initialized to appliedLSN + 1. If there are no log entries in the log, then everything has been // pruned already or there has not been any log entries yet. Setting this +1 avoids trying to clean up log entries // that do not exist. If there are some, we'll set oldestLSN to the head of the log below. -- GitLab From 8c951b3243e3d9628f956cf89421b79f3219cf74 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 21 Nov 2024 17:41:44 +0700 Subject: [PATCH 06/15] storage: Remove redundant stale file cleaning up function removeStaleWALFiles is not unnecessary these days as the log entry is self-contained in the directory. Previously the manifest was in the database with the pack files only in the directory. In fact this entire function is likely unnecessary as log appending is an atomic rename, and the pruning old entries could just happen in the usual code paths. --- .../partition/transaction_manager.go | 46 ------------------- 1 file changed, 46 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index feb898984a2..1e9d2672d1b 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -2655,10 +2655,6 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})} } - if err := mgr.removeStaleWALFiles(ctx, mgr.oldestLSN, mgr.appendedLSN); err != nil { - return fmt.Errorf("remove stale packs: %w", err) - } - mgr.testHooks.beforeInitialization() mgr.initializationSuccessful = true @@ -2725,48 +2721,6 @@ func (mgr *TransactionManager) getAbsolutePath(relativePath ...string) string { return filepath.Join(append([]string{mgr.storagePath}, relativePath...)...) } -// removeStaleWALFiles removes files from the log directory that have no associated log entry. -// Such files can be left around if transaction's files were moved in place successfully -// but the manager was interrupted before successfully persisting the log entry itself. -// If the manager deletes a log entry successfully from the database but is interrupted before it cleans -// up the associated files, such a directory can also be left at the head of the log. -func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, oldestLSN, appendedLSN storage.LSN) error { - needsFsync := false - for _, possibleStaleFilesPath := range []string{ - // Log entries are pruned one by one. If a write is interrupted, the only possible stale files would be - // for the log entry preceding the oldest log entry. - walFilesPathForLSN(mgr.stateDirectory, oldestLSN-1), - // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale - // files would be for the next LSN. Remove the files if they exist. - walFilesPathForLSN(mgr.stateDirectory, appendedLSN+1), - } { - - if _, err := os.Stat(possibleStaleFilesPath); err != nil { - if !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("stat: %w", err) - } - - // No stale files were present. - continue - } - - if err := os.RemoveAll(possibleStaleFilesPath); err != nil { - return fmt.Errorf("remove all: %w", err) - } - - needsFsync = true - } - - if needsFsync { - // Sync the parent directory to flush the file deletion. - if err := safe.NewSyncer().Sync(ctx, walFilesPath(mgr.stateDirectory)); err != nil { - return fmt.Errorf("sync: %w", err) - } - } - - return nil -} - // walFilesPath returns the WAL directory's path. func walFilesPath(stateDir string) string { return filepath.Join(stateDir, "wal") -- GitLab From 277ef9b1a2652bb9f1cd1588476acab744a34c1d Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 26 Nov 2024 18:05:59 +0700 Subject: [PATCH 07/15] storage: Extract log management out of TransactionManager This commit refactors the log management within the TransactionManager by extracting it into a separate LogManager component. The LogManager is responsible for handling the write-ahead log (WAL), including management of its entries, state directories, and log entry application processes. Summary of changes: - Moved the logic related to WAL management from TransactionManager to a new LogManager component. - Defined clear boundaries for LogManager such as AppendLogEntry, Initialize, etc. encapsulate WAL functionality. - TransactionManager now holds a LogManager instance and delegates tasks related to WAL to this component, simplifying its own responsibilities. - Improved encapsulation and separation of concerns by isolating log management and transaction lifecycle operations. - Updated tests to align with the new structure and ensure the separation does not affect transaction operations. - Generalize tracking positions in LogManager. Applied position tracked along side with Consumer position. They are used to determine the safe pruning position. This commit keeps new LogManager located inside packagemgr package to avoid having too many moving parts. Log consumer is also passed from TransactionManager. --- .../storagemgr/partition/log_manager.go | 354 ++++++++++++++++++ .../storagemgr/partition/testhelper_test.go | 2 +- .../partition/transaction_manager.go | 295 +++------------ .../transaction_manager_hook_test.go | 8 +- .../partition/transaction_manager_test.go | 8 +- 5 files changed, 407 insertions(+), 260 deletions(-) create mode 100644 internal/gitaly/storage/storagemgr/partition/log_manager.go diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log_manager.go new file mode 100644 index 00000000000..027d129f89b --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log_manager.go @@ -0,0 +1,354 @@ +package partition + +import ( + "context" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "runtime/trace" + "sync" + "sync/atomic" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/safe" +) + +// walStatePath returns the WAL directory's path. +func walStatePath(stateDir string) string { + return filepath.Join(stateDir, "wal") +} + +// walFilesPathForLSN returns an absolute path to a given log entry's WAL files. +func walFilesPathForLSN(stateDir string, lsn storage.LSN) string { + return filepath.Join(walStatePath(stateDir), lsn.String()) +} + +type positionType int + +const ( + // appliedPosition keeps track of latest applied position. WAL could not prune a log entry if it has not been applied. + appliedPosition positionType = iota + 1 + // consumerPosition keeps track of the latest consumer acknowledgement. + consumerPosition +) + +// position tracks the last LSN acknowledged for of a particular type. +type position struct { + lsn atomic.Value +} + +func newPosition() *position { + p := position{} + p.setPosition(0) + return &p +} + +func (p *position) getPosition() storage.LSN { + return p.lsn.Load().(storage.LSN) +} + +func (p *position) setPosition(pos storage.LSN) { + p.lsn.Store(pos) +} + +type testLogHooks struct { + beforeAppendLogEntry func() +} + +// LogManager is responsible for managing the Write-Ahead Log (WAL) entries on disk. It maintains the in-memory state +// and indexing system that reflect the functional state of the WAL. The LogManager ensures safe and consistent +// proposals, applications, and prunings of log entries, acting as the interface for transactional log operations. It +// coordinates with LogConsumer to allow safe consumption of log entries while handling retention and cleanup based on +// references and acknowledgements. It effectively abstracts WAL operations from the TransactionManager, contributing to +// a cleaner separation of concerns and making the system more maintainable and extensible. +type LogManager struct { + // mutex protects access to critical states, especially `oldestLSN` and `appendedLSN`, as well as the integrity + // of inflight log entries. Since indices are monotonic, two parallel log appending operations result in pushing + // files into the same directory and breaking the manifest file. Thus, Parallel log entry appending and pruning + // are not supported. + mutex sync.Mutex + + // storageName is the name of the storage the LogManager's partition is a member of. + storageName string + // storage.PartitionID is the ID of the partition this manager is operating on. + partitionID storage.PartitionID + + // tmpDirectory is the directory storing temporary data. One example is log entry deletion. WAL moves a log + // entry to this dir before removing them completely. + tmpDirectory string + // stateDirectory is an absolute path to a directory where write-ahead log stores log entries + stateDirectory string + + // appendedLSN holds the LSN of the last log entry appended to the partition's write-ahead log. + appendedLSN storage.LSN + // oldestLSN holds the LSN of the head of log entries which is still kept in the database. The manager keeps + // them because they are still referred by a transaction. + oldestLSN storage.LSN + + // consumer is an external caller that may perform read-only operations against applied log entries. Log entries + // are retained until the consumer has acknowledged past their LSN. + consumer LogConsumer + // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL + positions map[positionType]*position + + // notifyQueue is a queue notifying when there is a new change. + notifyQueue chan struct{} + + // testHooks are used in the tests to trigger logic at certain points in the execution. + // They are used to synchronize more complex test scenarios. Not used in production. + testHooks testLogHooks +} + +// NewLogManager returns an instance of LogManager. +func NewLogManager(storageName string, partitionID storage.PartitionID, stagingDirectory string, stateDirectory string, consumer LogConsumer) *LogManager { + positions := map[positionType]*position{ + appliedPosition: newPosition(), + } + if consumer != nil { + positions[consumerPosition] = newPosition() + } + return &LogManager{ + storageName: storageName, + partitionID: partitionID, + tmpDirectory: stagingDirectory, + stateDirectory: stateDirectory, + consumer: consumer, + positions: positions, + notifyQueue: make(chan struct{}, 1), + testHooks: testLogHooks{ + beforeAppendLogEntry: func() {}, + }, + } +} + +// Initialize sets up the initial state of the LogManager, preparing it to manage the write-ahead log entries. It reads +// the last applied LSN from the database to resume from where it left off, creates necessary directories, and +// initializes in-memory tracking variables such as appendedLSN and oldestLSN based on the files present in the WAL +// directory. This method also removes any stale log files that may have been left due to interrupted operations, +// ensuring the WAL directory only contains valid log entries. If a LogConsumer is present, it notifies it of the +// initial log entry state, enabling consumers to start processing from the correct point. Proper initialization is +// crucial for maintaining data consistency and ensuring that log entries are managed accurately upon system startup. +func (mgr *LogManager) Initialize(ctx context.Context, appliedLSN storage.LSN) error { + if err := mgr.createStateDirectory(ctx); err != nil { + return fmt.Errorf("create state directory: %w", err) + } + + // The LSN of the last appended log entry is determined from the LSN of the latest entry in the log and the + // latest applied log entry. As a log entry could be used by consumers (such as log consumer for backup) it + // needs to be reserved them until it is not used by any components. + // oldestLSN is initialized to appliedLSN + 1. If there are no log entries in the log, then everything has been + // pruned already or there has not been any log entries yet. Setting this +1 avoids trying to clean up log entries + // that do not exist. If there are some, we'll set oldestLSN to the head of the log below. + mgr.oldestLSN = appliedLSN + 1 + // appendedLSN is initialized to appliedLSN. If there are no log entries, then there has been no transaction yet, or + // all log entries have been applied and have been already pruned. If there are some in the log, we'll update this + // below to match. + mgr.appendedLSN = appliedLSN + + if logEntries, err := os.ReadDir(walStatePath(mgr.stateDirectory)); err != nil { + return fmt.Errorf("read wal directory: %w", err) + } else if len(logEntries) > 0 { + if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { + return fmt.Errorf("parse oldest LSN: %w", err) + } + if mgr.appendedLSN, err = storage.ParseLSN(logEntries[len(logEntries)-1].Name()); err != nil { + return fmt.Errorf("parse appended LSN: %w", err) + } + } + + if mgr.consumer != nil { + // Set acknowledged position to oldestLSN - 1 and notify the consumer from oldestLSN -> appendedLSN. + // If set the position to 0, the consumer is unable to read pruned entry anyway. + mgr.AcknowledgeConsumerPosition(mgr.oldestLSN - 1) + mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) + } + mgr.AcknowledgeAppliedPosition(appliedLSN) + + return nil +} + +// AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. +func (mgr *LogManager) AcknowledgeAppliedPosition(lsn storage.LSN) { + mgr.positions[appliedPosition].setPosition(lsn) +} + +// AcknowledgeConsumerPosition acknowledges log entries up and including LSN as successfully processed for the specified +// LogConsumer. The manager is awakened if it is currently awaiting a new or completed transaction. +func (mgr *LogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { + if mgr.consumer == nil { + panic("log manager's consumer must be present prior to AcknowledgeConsumerPos call") + } + mgr.positions[consumerPosition].setPosition(lsn) + + // Alert the outsider. If it has a pending acknowledgement already no action is required. + select { + case mgr.notifyQueue <- struct{}{}: + default: + } +} + +// NotifyQueue returns a notify channel so that caller can poll new changes. +func (mgr *LogManager) NotifyQueue() <-chan struct{} { + return mgr.notifyQueue +} + +// AppendLogEntry appends an entry to the write-ahead log. logEntryPath is an +// absolute path to the directory that represents the log entry. appendLogEntry +// moves the log entry's directory to the WAL, and returns its LSN once it has +// been committed to the log. +func (mgr *LogManager) AppendLogEntry(ctx context.Context, logEntryPath string) (storage.LSN, error) { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + nextLSN := mgr.appendedLSN + 1 + mgr.testHooks.beforeAppendLogEntry() + + // Move the log entry from the staging directory into its place in the log. + destinationPath := mgr.GetEntryPath(nextLSN) + if err := os.Rename(logEntryPath, destinationPath); err != nil { + return 0, fmt.Errorf("move wal files: %w", err) + } + + // After this sync, the log entry has been persisted and will be recovered on failure. + if err := safe.NewSyncer().SyncParent(ctx, destinationPath); err != nil { + // If this fails, the log entry will be left in the write-ahead log but it is not + // properly persisted. If the fsync fails, something is seriously wrong and there's no + // point trying to delete the files. The right thing to do is to terminate Gitaly + // immediately as going further could cause data loss and corruption. This error check + // will later be replaced with a panic that terminates Gitaly. + // + // For more details, see: https://gitlab.com/gitlab-org/gitaly/-/issues/5774 + return 0, fmt.Errorf("sync log entry: %w", err) + } + + mgr.appendedLSN = nextLSN + + if mgr.consumer != nil { + mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) + } + return nextLSN, nil +} + +func (mgr *LogManager) createStateDirectory(ctx context.Context) error { + needsFsync := false + for _, path := range []string{ + mgr.stateDirectory, + filepath.Join(mgr.stateDirectory, "wal"), + } { + err := os.Mkdir(path, mode.Directory) + switch { + case errors.Is(err, fs.ErrExist): + continue + case err != nil: + return fmt.Errorf("mkdir: %w", err) + } + + // The directory was created so we need to fsync. + needsFsync = true + } + + // If the directories already existed and we didn't create them, don't fsync. + if !needsFsync { + return nil + } + + syncer := safe.NewSyncer() + if err := syncer.SyncRecursive(ctx, mgr.stateDirectory); err != nil { + return fmt.Errorf("sync: %w", err) + } + + if err := syncer.SyncParent(ctx, mgr.stateDirectory); err != nil { + return fmt.Errorf("sync parent: %w", err) + } + + return nil +} + +// PruneLogEntries prunes log entries from the Write-Ahead Log (WAL) that have been committed and are no longer +// needed. It ensures efficient storage management by removing redundant entries while maintaining the integrity of the +// log sequence. The method respects the established low-water mark, ensuring no entries that might still be required +// for transaction consistency are deleted. +func (mgr *LogManager) PruneLogEntries(ctx context.Context) error { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + // When a log entry is applied, if there is any log in front of it which are still referred, we cannot delete + // it. This condition is to prevent a "hole" in the list. A transaction referring to a log entry at the + // low-water mark might scan all afterward log entries. Thus, the manager needs to keep in the database. + // + // ┌── Consumer not acknowledged + // │ ┌─ Applied til this point + // Can remove │ │ ┌─ Free, but cannot be removed + // ◄───────────► │ │ │ + // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ + // └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ + // └─ oldestLSN ▲ + // │ + // Low-water mark + // + for mgr.oldestLSN < mgr.lowWaterMark() { + if err := mgr.deleteLogEntry(ctx, mgr.oldestLSN); err != nil { + return fmt.Errorf("deleting log entry: %w", err) + } + mgr.oldestLSN++ + } + return nil +} + +// GetEntryPath returns the path of the log entry's root directory. +func (mgr *LogManager) GetEntryPath(lsn storage.LSN) string { + return walFilesPathForLSN(mgr.stateDirectory, lsn) +} + +// deleteLogEntry deletes the log entry at the given LSN from the log. +func (mgr *LogManager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { + defer trace.StartRegion(ctx, "deleteLogEntry").End() + + tmpDir, err := os.MkdirTemp(mgr.tmpDirectory, "") + if err != nil { + return fmt.Errorf("mkdir temp: %w", err) + } + + logEntryPath := walFilesPathForLSN(mgr.stateDirectory, lsn) + // We can't delete a directory atomically as we have to first delete all of its content. + // If the deletion was interrupted, we'd be left with a corrupted log entry on the disk. + // To perform the deletion atomically, we move the to be deleted log entry out from the + // log into a temporary directory and sync the move. After that, the log entry is no longer + // in the log, and we can delete the files without having to worry about the deletion being + // interrupted and being left with a corrupted log entry. + if err := os.Rename(logEntryPath, filepath.Join(tmpDir, "to_delete")); err != nil { + return fmt.Errorf("rename: %w", err) + } + + if err := safe.NewSyncer().SyncParent(ctx, logEntryPath); err != nil { + return fmt.Errorf("sync file deletion: %w", err) + } + + // With the log entry removed from the log, we can now delete the files. There's no need + // to sync the deletions as the log entry is a temporary directory that will be removed + // on start up if they are left around from a crash. + if err := os.RemoveAll(tmpDir); err != nil { + return fmt.Errorf("remove files: %w", err) + } + + return nil +} + +// lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than +// this mark are removed. +func (mgr *LogManager) lowWaterMark() storage.LSN { + minAcknowledged := mgr.appendedLSN + 1 + + // Position is the last acknowledged LSN, this is eligible for pruning. + // lowWaterMark returns the lowest LSN that cannot be pruned, so add one. + for _, pos := range mgr.positions { + if pos.getPosition()+1 < minAcknowledged { + minAcknowledged = pos.getPosition() + 1 + } + } + + return minAcknowledged +} diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index dbafd843c14..a5e23b694ff 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -1575,7 +1575,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } } - RequireConsumer(t, transactionManager.consumer, transactionManager.consumerPos, tc.expectedState.Consumers) + RequireConsumer(t, transactionManager.logManager.consumer, transactionManager.logManager.positions[consumerPosition], tc.expectedState.Consumers) testhelper.RequireDirectoryState(t, stateDir, "", expectedDirectory) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 1e9d2672d1b..458b81fb813 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -18,7 +18,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "syscall" "time" @@ -330,7 +329,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti txn := &Transaction{ write: opts.Write, commit: mgr.commit, - snapshotLSN: mgr.appendedLSN, + snapshotLSN: mgr.logManager.appendedLSN, finished: make(chan struct{}), relativePath: relativePath, metrics: mgr.metrics, @@ -412,7 +411,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti } if admitted { - // If the transcation was admitted, `.Run()` is responsible for cleaning the transaction up. + // If the transaction was admitted, `.Run()` is responsible for cleaning the transaction up. // Cleaning up the snapshots can take a relatively long time if the snapshots are large, or if // the file system is busy. To avoid blocking transaction processing, we us a pool of background // workers to clean up the transaction snapshots. @@ -894,37 +893,12 @@ type committedEntry struct { // for the specified LogConsumer. The manager is awakened if it is currently awaiting a new or // completed transaction. func (mgr *TransactionManager) AcknowledgeTransaction(lsn storage.LSN) { - mgr.consumerPos.setPosition(lsn) - - // Alert the manager. If it has a pending acknowledgement already no action is required. - select { - case mgr.acknowledgedQueue <- struct{}{}: - default: - } + mgr.logManager.AcknowledgeConsumerPosition(lsn) } // GetTransactionPath returns the path of the log entry's root directory. func (mgr *TransactionManager) GetTransactionPath(lsn storage.LSN) string { - return walFilesPathForLSN(mgr.stateDirectory, lsn) -} - -// position tracks the last LSN acknowledged for of a particular type. -type position struct { - lsn atomic.Value -} - -func newPosition() *position { - p := position{} - p.setPosition(0) - return &p -} - -func (p *position) getPosition() storage.LSN { - return p.lsn.Load().(storage.LSN) -} - -func (p *position) setPosition(pos storage.LSN) { - p.lsn.Store(pos) + return mgr.logManager.GetEntryPath(lsn) } // TransactionManager is responsible for transaction management of a single repository. Each repository has @@ -978,9 +952,6 @@ type TransactionManager struct { // being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly // releases awaiters when the transactions processing is stopped. closed chan struct{} - // stateDirectory is an absolute path to a directory where the TransactionManager stores the state related to its - // write-ahead log. - stateDirectory string // stagingDirectory is a path to a directory where this TransactionManager should stage the files of the transactions // before it logs them. The TransactionManager cleans up the files during runtime but stale files may be // left around after crashes. The files are temporary and any leftover files are expected to be cleaned up when @@ -999,6 +970,8 @@ type TransactionManager struct { partitionID storage.PartitionID // db is the handle to the key-value store used for storing the write-ahead log related state. db keyvalue.Transactioner + // logManager manages the underlying Write-Ahead Log entries. + logManager *LogManager // admissionQueue is where the incoming writes are waiting to be admitted to the transaction // manager. admissionQueue chan *Transaction @@ -1033,14 +1006,8 @@ type TransactionManager struct { // conflictMgr is responsible for checking concurrent transactions against each other for conflicts. conflictMgr *conflict.Manager - // appendedLSN holds the LSN of the last log entry appended to the partition's write-ahead log. - appendedLSN storage.LSN // appliedLSN holds the LSN of the last log entry applied to the partition. appliedLSN storage.LSN - // oldestLSN holds the LSN of the head of log entries which is still kept in the database. The manager keeps - // them because they are still referred by a transaction. - oldestLSN storage.LSN - // awaitingTransactions contains transactions waiting for their log entry to be applied to // the partition. It's keyed by the LSN the transaction is waiting to be applied and the // value is the resultChannel that is waiting the result. @@ -1052,14 +1019,6 @@ type TransactionManager struct { // the corresponding snapshots. committedEntries *list.List - // consumer is an the external caller that may perform read-only operations against applied - // log entries. Log entries are retained until the consumer has acknowledged past their LSN. - consumer LogConsumer - // consumerPos tracks the largest LSN that has been acknowledged by consumer. - consumerPos *position - // acknowledgedQueue is a queue notifying when a transaction has been acknowledged. - acknowledgedQueue chan struct{} - // testHooks are used in the tests to trigger logic at certain points in the execution. // They are used to synchronize more complex test scenarios. Not used in production. testHooks testHooks @@ -1070,7 +1029,6 @@ type TransactionManager struct { type testHooks struct { beforeInitialization func() - beforeAppendLogEntry func() beforeApplyLogEntry func() beforeStoreAppliedLSN func() beforeRunExiting func() @@ -1092,8 +1050,6 @@ func NewTransactionManager( ) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) - consumerPos := newPosition() - cleanupWorkers := &errgroup.Group{} cleanupWorkers.SetLimit(25) @@ -1109,25 +1065,21 @@ func NewTransactionManager( storagePath: storagePath, partitionID: ptnID, db: db, + logManager: NewLogManager(storageName, ptnID, stagingDir, stateDir, consumer), admissionQueue: make(chan *Transaction), completedQueue: make(chan struct{}, 1), initialized: make(chan struct{}), snapshotLocks: make(map[storage.LSN]*snapshotLock), conflictMgr: conflict.NewManager(), - stateDirectory: stateDir, stagingDirectory: stagingDir, cleanupWorkers: cleanupWorkers, cleanupWorkerFailed: make(chan struct{}), awaitingTransactions: make(map[storage.LSN]resultChannel), committedEntries: list.New(), metrics: metrics, - consumer: consumer, - consumerPos: consumerPos, - acknowledgedQueue: make(chan struct{}, 1), testHooks: testHooks{ beforeInitialization: func() {}, - beforeAppendLogEntry: func() {}, beforeApplyLogEntry: func() {}, beforeStoreAppliedLSN: func() {}, beforeRunExiting: func() {}, @@ -2276,7 +2228,7 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { } for { - if mgr.appliedLSN < mgr.appendedLSN { + if mgr.appliedLSN < mgr.logManager.appendedLSN { lsn := mgr.appliedLSN + 1 if err := mgr.applyLogEntry(ctx, lsn); err != nil { @@ -2286,27 +2238,8 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { continue } - // When a log entry is applied, if there is any log in front of it which are still referred, we cannot delete - // it. This condition is to prevent a "hole" in the list. A transaction referring to a log entry at the - // low-water mark might scan all afterward log entries. Thus, the manager needs to keep in the database. - // - // - // ┌── Consumer not acknowledged - // │ ┌─ Applied til this point - // Can remove │ │ ┌─ Free, but cannot be removed - // ◄───────────► │ │ │ - // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ - // └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ - // └─ oldestLSN ▲ - // │ - // Low-water mark - // - if mgr.oldestLSN < mgr.lowWaterMark() { - if err := mgr.deleteLogEntry(ctx, mgr.oldestLSN); err != nil { - return fmt.Errorf("deleting log entry: %w", err) - } - mgr.oldestLSN++ - continue + if err := mgr.logManager.PruneLogEntries(mgr.ctx); err != nil { + return fmt.Errorf("pruning log entries: %w", err) } if err := mgr.processTransaction(ctx); err != nil { @@ -2336,7 +2269,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned return errors.New("cleanup worker failed") case <-mgr.completedQueue: return nil - case <-mgr.acknowledgedQueue: + case <-mgr.logManager.NotifyQueue(): return nil case <-ctx.Done(): } @@ -2465,7 +2398,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned // Commit the prepared transaction now that we've managed to commit the log entry. mgr.mutex.Lock() - preparedTX.Commit(ctx, mgr.appendedLSN) + preparedTX.Commit(ctx, mgr.logManager.appendedLSN) mgr.mutex.Unlock() return nil @@ -2474,7 +2407,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned return nil } - mgr.awaitingTransactions[mgr.appendedLSN] = transaction.result + mgr.awaitingTransactions[mgr.logManager.appendedLSN] = transaction.result return nil } @@ -2590,8 +2523,9 @@ func (mgr *TransactionManager) snapshotsDir() string { return filepath.Join(mgr.stagingDirectory, "snapshots") } -// initialize initializes the TransactionManager's state from the database. It loads the appended and the applied -// LSNs and initializes the notification channels that synchronize transaction beginning with log entry applying. +// initialize initializes the TransactionManager's state from the database. It initializes WAL log manager and the +// applied LSNs and initializes the notification channels that synchronize transaction beginning with log entry +// applying. func (mgr *TransactionManager) initialize(ctx context.Context) error { defer trace.StartRegion(ctx, "initialize").End() @@ -2604,8 +2538,8 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { mgr.appliedLSN = storage.LSN(appliedLSN.GetValue()) - if err := mgr.createStateDirectory(ctx); err != nil { - return fmt.Errorf("create state directory: %w", err) + if err := mgr.logManager.Initialize(ctx, mgr.appliedLSN); err != nil { + return fmt.Errorf("initialize log management: %w", err) } if err := os.Mkdir(mgr.snapshotsDir(), mode.Directory); err != nil { @@ -2617,33 +2551,6 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return fmt.Errorf("new snapshot manager: %w", err) } - // oldestLSN is initialized to appliedLSN + 1. If there are no log entries in the log, then everything has been - // pruned already or there has not been any log entries yet. Setting this +1 avoids trying to clean up log entries - // that do not exist. If there are some, we'll set oldestLSN to the head of the log below. - mgr.oldestLSN = mgr.appliedLSN + 1 - // appendedLSN is initialized to appliedLSN. If there are no log entries, then there has been no transaction yet, or - // all log entries have been applied and have been already pruned. If there are some in the log, we'll update this - // below to match. - mgr.appendedLSN = mgr.appliedLSN - - if logEntries, err := os.ReadDir(walFilesPath(mgr.stateDirectory)); err != nil { - return fmt.Errorf("read wal directory: %w", err) - } else if len(logEntries) > 0 { - if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { - return fmt.Errorf("parse oldest LSN: %w", err) - } - if mgr.appendedLSN, err = storage.ParseLSN(logEntries[len(logEntries)-1].Name()); err != nil { - return fmt.Errorf("parse appended LSN: %w", err) - } - } - - if mgr.consumer != nil { - // Set acknowledged position to oldestLSN - 1 and notify the consumer from oldestLSN -> appendedLSN. - // If set the position to 0, the consumer is unable to read pruned entry anyway. - mgr.consumerPos.setPosition(mgr.oldestLSN - 1) - mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) - } - // Create a snapshot lock for the applied LSN as it is used for synchronizing // the snapshotters with the log application. mgr.snapshotLocks[mgr.appliedLSN] = &snapshotLock{applied: make(chan struct{})} @@ -2651,7 +2558,7 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { // Each unapplied log entry should have a snapshot lock as they are created in normal // operation when committing a log entry. Recover these entries. - for i := mgr.appliedLSN + 1; i <= mgr.appendedLSN; i++ { + for i := mgr.appliedLSN + 1; i <= mgr.logManager.appendedLSN; i++ { mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})} } @@ -2681,56 +2588,11 @@ func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relative return true, nil } -func (mgr *TransactionManager) createStateDirectory(ctx context.Context) error { - needsFsync := false - for _, path := range []string{ - mgr.stateDirectory, - filepath.Join(mgr.stateDirectory, "wal"), - } { - if err := os.Mkdir(path, mode.Directory); err != nil { - if !errors.Is(err, fs.ErrExist) { - return fmt.Errorf("mkdir: %w", err) - } - - continue - } - - // The directory was created so we need to fsync. - needsFsync = true - } - - // If the directories already existed and we didn't create them, don't fsync. - if !needsFsync { - return nil - } - - syncer := safe.NewSyncer() - if err := syncer.SyncRecursive(ctx, mgr.stateDirectory); err != nil { - return fmt.Errorf("sync: %w", err) - } - - if err := syncer.SyncParent(ctx, mgr.stateDirectory); err != nil { - return fmt.Errorf("sync parent: %w", err) - } - - return nil -} - // getAbsolutePath returns the relative path's absolute path in the storage. func (mgr *TransactionManager) getAbsolutePath(relativePath ...string) string { return filepath.Join(append([]string{mgr.storagePath}, relativePath...)...) } -// walFilesPath returns the WAL directory's path. -func walFilesPath(stateDir string) string { - return filepath.Join(stateDir, "wal") -} - -// walFilesPathForLSN returns an absolute path to a given log entry's WAL files. -func walFilesPathForLSN(stateDir string, lsn storage.LSN) string { - return filepath.Join(walFilesPath(stateDir), lsn.String()) -} - // manifestPath returns the manifest file's path in the log entry. func manifestPath(logEntryPath string) string { return filepath.Join(logEntryPath, "MANIFEST") @@ -3558,9 +3420,8 @@ func (mgr *TransactionManager) applyReferenceTransaction(ctx context.Context, ch return nil } -// appendLogEntry appends the transaction to the write-ahead log. It first writes the transaction's manifest file -// into the log entry's directory. Afterwards it moves the log entry's directory from the staging area to its final -// place in the write-ahead log. +// appendLogEntry appends a log etnry of a transaction to the write-ahead log. After the log entry is appended to WAL, +// the corresponding snapshot lock and in-memory reference for the latest appended LSN is created. func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { defer trace.StartRegion(ctx, "appendLogEntry").End() @@ -3590,46 +3451,29 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende return fmt.Errorf("synchronizing WAL directory: %w", err) } - mgr.testHooks.beforeAppendLogEntry() - - nextLSN := mgr.appendedLSN + 1 - // Move the log entry from the staging directory into its place in the log. - destinationPath := walFilesPathForLSN(mgr.stateDirectory, nextLSN) - if err := os.Rename(logEntryPath, destinationPath); err != nil { - return fmt.Errorf("move wal files: %w", err) - } - - // Sync the WAL directory. The manifest has been synced above, and all of the other files - // have been synced before queuing for commit. At this point we just have to sync the - // directory entry of the new log entry in the WAL directory to finalize the commit. - // - // After this sync, the log entry has been persisted and will be recovered on failure. - if err := safe.NewSyncer().SyncParent(ctx, destinationPath); err != nil { - // If this fails, the log entry will be left in the write-ahead log but it is not - // properly persisted. If the fsync fails, something is seriously wrong and there's no - // point trying to delete the files. The right thing to do is to terminate Gitaly - // immediately as going further could cause data loss and corruption. This error check - // will later be replaced with a panic that terminates Gitaly. - // - // For more details, see: https://gitlab.com/gitlab-org/gitaly/-/issues/5774 - return fmt.Errorf("sync log entry: %w", err) - } + // Pre-setup an snapshot lock entry for the assumed appended LSN location. + mgr.mutex.Lock() + mgr.snapshotLocks[mgr.logManager.appendedLSN+1] = &snapshotLock{applied: make(chan struct{})} + mgr.mutex.Unlock() // After this latch block, the transaction is committed and all subsequent transactions // are guaranteed to read it. + appendedLSN, err := mgr.logManager.AppendLogEntry(ctx, logEntryPath) + if err != nil { + mgr.mutex.Lock() + delete(mgr.snapshotLocks, mgr.logManager.appendedLSN+1) + mgr.mutex.Unlock() + return fmt.Errorf("append log entry: %w", err) + } + mgr.mutex.Lock() - mgr.appendedLSN = nextLSN - mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})} mgr.committedEntries.PushBack(&committedEntry{ - lsn: nextLSN, + lsn: appendedLSN, entry: logEntry, objectDependencies: objectDependencies, }) mgr.mutex.Unlock() - if mgr.consumer != nil { - mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) - } return nil } @@ -3656,7 +3500,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS mgr.testHooks.beforeApplyLogEntry() if err := mgr.db.Update(func(tx keyvalue.ReadWriter) error { - if err := applyOperations(ctx, safe.NewSyncer().Sync, mgr.storagePath, walFilesPathForLSN(mgr.stateDirectory, lsn), logEntry.GetOperations(), tx); err != nil { + if err := applyOperations(ctx, safe.NewSyncer().Sync, mgr.storagePath, mgr.logManager.GetEntryPath(lsn), logEntry.GetOperations(), tx); err != nil { return fmt.Errorf("apply operations: %w", err) } @@ -3665,12 +3509,9 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS return fmt.Errorf("update: %w", err) } - mgr.testHooks.beforeStoreAppliedLSN() if err := mgr.storeAppliedLSN(lsn); err != nil { return fmt.Errorf("set applied LSN: %w", err) } - - mgr.appliedLSN = lsn mgr.snapshotManager.SetLSN(lsn) // There is no awaiter for a transaction if the transaction manager is recovering @@ -3715,43 +3556,9 @@ func (mgr *TransactionManager) createRepository(ctx context.Context, repositoryP return nil } -// deleteLogEntry deletes the log entry at the given LSN from the log. -func (mgr *TransactionManager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { - defer trace.StartRegion(ctx, "deleteLogEntry").End() - - tmpDir, err := os.MkdirTemp(mgr.stagingDirectory, "") - if err != nil { - return fmt.Errorf("mkdir temp: %w", err) - } - - logEntryPath := walFilesPathForLSN(mgr.stateDirectory, lsn) - // We can't delete a directory atomically as we have to first delete all of its content. - // If the deletion was interrupted, we'd be left with a corrupted log entry on the disk. - // To perform the deletion atomically, we move the to be deleted log entry out from the - // log into a temporary directory and sync the move. After that, the log entry is no longer - // in the log, and we can delete the files without having to worry about the deletion being - // interrupted and being left with a corrupted log entry. - if err := os.Rename(logEntryPath, filepath.Join(tmpDir, "to_delete")); err != nil { - return fmt.Errorf("rename: %w", err) - } - - if err := safe.NewSyncer().SyncParent(ctx, logEntryPath); err != nil { - return fmt.Errorf("sync file deletion: %w", err) - } - - // With the log entry removed from the log, we can now delete the files. There's no need - // to sync the deletions as the log entry is a temporary directory that will be removed - // on start up if they are left around from a crash. - if err := os.RemoveAll(tmpDir); err != nil { - return fmt.Errorf("remove files: %w", err) - } - - return nil -} - // readLogEntry returns the log entry from the given position in the log. func (mgr *TransactionManager) readLogEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { - manifestBytes, err := os.ReadFile(manifestPath(walFilesPathForLSN(mgr.stateDirectory, lsn))) + manifestBytes, err := os.ReadFile(manifestPath(mgr.logManager.GetEntryPath(lsn))) if err != nil { return nil, fmt.Errorf("read manifest: %w", err) } @@ -3766,7 +3573,14 @@ func (mgr *TransactionManager) readLogEntry(lsn storage.LSN) (*gitalypb.LogEntry // storeAppliedLSN stores the partition's applied LSN in the database. func (mgr *TransactionManager) storeAppliedLSN(lsn storage.LSN) error { - return mgr.setKey(keyAppliedLSN, lsn.ToProto()) + mgr.testHooks.beforeStoreAppliedLSN() + + if err := mgr.setKey(keyAppliedLSN, lsn.ToProto()); err != nil { + return err + } + mgr.logManager.AcknowledgeAppliedPosition(lsn) + mgr.appliedLSN = lsn + return nil } // setKey marshals and stores a given protocol buffer message into the database under the given key. @@ -3799,27 +3613,6 @@ func (mgr *TransactionManager) readKey(key []byte, destination proto.Message) er }) } -// lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than -// this mark are removed. -func (mgr *TransactionManager) lowWaterMark() storage.LSN { - mgr.mutex.Lock() - defer mgr.mutex.Unlock() - - // Greater than the maximum position of any consumer. - minConsumed := mgr.appliedLSN + 1 - - if mgr.consumer != nil { - // Position is the last acknowledged LSN, this is eligible for pruning. - // lowWaterMark returns the lowest LSN that cannot be pruned, so add one. - pos := mgr.consumerPos.getPosition() + 1 - if pos < minConsumed { - minConsumed = pos - } - } - - return minConsumed -} - // updateCommittedEntry updates the reader counter of the committed entry of the snapshot that this transaction depends on. func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN storage.LSN) *committedEntry { // Since the goroutine doing this is holding the lock, the snapshotLSN shouldn't change and no new transactions diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index e12f69d0897..15ec82f1e25 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -25,10 +25,10 @@ type hookContext struct { // installHooks takes the hooks in the test setup and configures them in the TransactionManager. func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, hooks testTransactionHooks) { for destination, source := range map[*func()]hookFunc{ - &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, - &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, - &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, - &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, + &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, + &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, + &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, + &mgr.logManager.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, &mgr.testHooks.beforeRunExiting: func(hookContext) { if hooks.WaitForTransactionsWhenClosing { inflightTransactions.Wait() diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index f10a5248e5a..e9de6ae064c 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -2113,7 +2113,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) // Transaction 2 and 3 are left-over. - testhelper.RequireDirectoryState(t, tm.stateDirectory, "", + testhelper.RequireDirectoryState(t, tm.logManager.stateDirectory, "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -2129,7 +2129,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(3)) - require.Equal(t, tm.appendedLSN, storage.LSN(3)) + require.Equal(t, tm.logManager.appendedLSN, storage.LSN(3)) }), }, expectedState: StateAssertion{ @@ -2263,7 +2263,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) // Transaction 2 and 3 are left-over. - testhelper.RequireDirectoryState(t, tm.stateDirectory, "", testhelper.DirectoryState{ + testhelper.RequireDirectoryState(t, tm.logManager.stateDirectory, "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000004": {Mode: mode.Directory}, @@ -2280,7 +2280,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(4).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(4)) - require.Equal(t, tm.appendedLSN, storage.LSN(4)) + require.Equal(t, tm.logManager.appendedLSN, storage.LSN(4)) }), }, expectedState: StateAssertion{ -- GitLab From 7e7a51222e52c689fcb33acc9b25ddb4dbe5931c Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 19 Nov 2024 20:41:44 +0700 Subject: [PATCH 08/15] storage: Improve test coverage of LogManager Prior commits move code around with minimal test modification. That approach is intentional the keep the size of commits small thus easier to review and spot errors. As a result, some functionality of LogManager is asserted by TransactionManager's tests indirectly. Those tests access the internal states of LogManager. This commit improves the test coverage of LogManager by adding more unit tests for each exposed function. Although LogManager's unit tests cover more scenarios, counterparts in TransactionManager help verify the interaction between TransactionManager and LogManager. Thus, they are simplified so that the test suite asserts the outcomes instead of internal states. --- .../storagemgr/partition/log_manager.go | 2 +- .../storagemgr/partition/log_manager_test.go | 576 ++++++++++++++++++ .../storagemgr/partition/testhelper_test.go | 35 -- .../transaction_manager_consumer_test.go | 24 - 4 files changed, 577 insertions(+), 60 deletions(-) create mode 100644 internal/gitaly/storage/storagemgr/partition/log_manager_test.go diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log_manager.go index 027d129f89b..b0d57367037 100644 --- a/internal/gitaly/storage/storagemgr/partition/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log_manager.go @@ -159,7 +159,7 @@ func (mgr *LogManager) Initialize(ctx context.Context, appliedLSN storage.LSN) e } } - if mgr.consumer != nil { + if mgr.consumer != nil && mgr.appendedLSN != 0 { // Set acknowledged position to oldestLSN - 1 and notify the consumer from oldestLSN -> appendedLSN. // If set the position to 0, the consumer is unable to read pruned entry anyway. mgr.AcknowledgeConsumerPosition(mgr.oldestLSN - 1) diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log_manager_test.go new file mode 100644 index 00000000000..6d5e5308a6c --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log_manager_test.go @@ -0,0 +1,576 @@ +package partition + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func appendLogEntry(t *testing.T, ctx context.Context, manager *LogManager, files map[string][]byte) storage.LSN { + t.Helper() + + logEntryPath := testhelper.TempDir(t) + for name, value := range files { + path := filepath.Join(logEntryPath, name) + require.NoError(t, os.WriteFile(path, value, mode.File)) + } + + nextLSN, err := manager.AppendLogEntry(ctx, logEntryPath) + require.NoError(t, err) + + return nextLSN +} + +func setupLogManager(t *testing.T, ctx context.Context, consumer LogConsumer) *LogManager { + logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer) + require.NoError(t, logManager.Initialize(ctx, 0)) + + return logManager +} + +func TestLogManager_Initialize(t *testing.T) { + t.Parallel() + + t.Run("initial state without prior log entries", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + + logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + require.Equal(t, storage.LSN(0), logManager.appendedLSN) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("existing WAL entries without existing appliedLSN", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + + logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + logManager = NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + require.Equal(t, storage.LSN(2), logManager.appendedLSN) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + t.Run("existing WAL entries with appliedLSN in-between", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + + logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.createStateDirectory(ctx)) + + for i := 0; i < 3; i++ { + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + logManager = NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 2)) + + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + require.Equal(t, storage.LSN(3), logManager.appendedLSN) + require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + + t.Run("existing WAL entries with up-to-date appliedLSN", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + for i := 0; i < 3; i++ { + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + logManager = NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 3)) + + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + require.Equal(t, storage.LSN(3), logManager.appendedLSN) + require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) +} + +func TestLogManager_PruneLogEntries(t *testing.T) { + t.Parallel() + + t.Run("no entries to remove", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Expect no entries to be removed + require.NoError(t, logManager.PruneLogEntries(ctx)) + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + + // Assert on-disk state + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("remove single applied entry", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Inject a single log entry + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + + // Set this entry as applied + logManager.AcknowledgeAppliedPosition(1) + + // Before removal + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + }) + + // Attempt to remove applied log entries + require.NoError(t, logManager.PruneLogEntries(ctx)) + + // After removal + require.Equal(t, storage.LSN(2), logManager.oldestLSN) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("retain entry due to low-water mark constraint", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, &mockLogConsumer{}) + + // Inject multiple log entries + for i := 0; i < 3; i++ { + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + // Set the applied LSN to 2 + logManager.AcknowledgeAppliedPosition(2) + // Manually set the consumer's position to the first entry, forcing low-water mark to retain it + logManager.AcknowledgeConsumerPosition(1) + + // Before removal + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + + require.NoError(t, logManager.PruneLogEntries(ctx)) + require.Equal(t, storage.LSN(2), logManager.oldestLSN) + + // Assert on-disk state to ensure no entries were removed + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + + t.Run("remove multiple applied entries", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Inject multiple log entries + for i := 0; i < 5; i++ { + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + // Set the applied LSN to 3, allowing the first three entries to be pruned + logManager.AcknowledgeAppliedPosition(3) + + // Before removal + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + "/wal/0000000000004": {Mode: mode.Directory}, + "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, + "/wal/0000000000005": {Mode: mode.Directory}, + "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, + }) + + require.NoError(t, logManager.PruneLogEntries(ctx)) + + // Ensure only entries starting from LSN 4 are retained + require.Equal(t, storage.LSN(4), logManager.oldestLSN) + + // Assert on-disk state after removals + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000004": {Mode: mode.Directory}, + "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, + "/wal/0000000000005": {Mode: mode.Directory}, + "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, + }) + }) +} + +func TestLogManager_AppendLogEntry(t *testing.T) { + t.Parallel() + + t.Run("append a log entry with a single file", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + }) + }) + + t.Run("append a log entry with multiple files", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{ + "1": []byte("content-1"), + "2": []byte("content-2"), + "3": []byte("content-3"), + }) + + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000001/2": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000001/3": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + + t.Run("append multiple entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2-1"), "2": []byte("content-2-2")}) + appendLogEntry(t, ctx, logManager, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(3)) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2-1")}, + "/wal/0000000000002/2": {Mode: mode.File, Content: []byte("content-2-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + }) + }) +} + +type mockLogConsumer struct { + mu sync.Mutex + positions [][]storage.LSN +} + +func (c *mockLogConsumer) NotifyNewTransactions(storageName string, partitionID storage.PartitionID, oldestLSN, appendedLSN storage.LSN) { + c.mu.Lock() + defer c.mu.Unlock() + + c.positions = append(c.positions, []storage.LSN{oldestLSN, appendedLSN}) +} + +func TestLogManager_Positions(t *testing.T) { + ctx := testhelper.Context(t) + + simulatePositions := func(t *testing.T, logManager *LogManager, consumed storage.LSN, applied storage.LSN) { + logManager.AcknowledgeConsumerPosition(consumed) + logManager.AcknowledgeAppliedPosition(applied) + require.NoError(t, logManager.PruneLogEntries(ctx)) + } + + t.Run("consumer pos is set to 0 after initialized", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + require.Equal(t, [][]storage.LSN(nil), mockConsumer.positions) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("notify consumer after restart", func(t *testing.T) { + stateDir := testhelper.TempDir(t) + + // Before restart + mockConsumer := &mockLogConsumer{} + + logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + require.NoError(t, logManager.Initialize(ctx, 0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + // Apply to 3 but consume to 1 + simulatePositions(t, logManager, 1, 2) + require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + + // Inject 3, 4 + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-4")}) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + "/wal/0000000000004": {Mode: mode.Directory}, + "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, + }) + + // Restart the log consumer. + mockConsumer = &mockLogConsumer{} + logManager = NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + require.NoError(t, logManager.Initialize(ctx, 2)) + + // Notify consumer to consume from 2 -> 4 + require.Equal(t, [][]storage.LSN{{2, 4}}, mockConsumer.positions) + + // Both consumer and applier catch up. + simulatePositions(t, logManager, 4, 4) + + // All log entries are pruned at this point. The consumer should not be notified again. + require.Equal(t, [][]storage.LSN{{2, 4}}, mockConsumer.positions) + require.Equal(t, storage.LSN(5), logManager.lowWaterMark()) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("unacknowledged entries are not pruned", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + simulatePositions(t, logManager, 0, 2) + + require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + t.Run("acknowledged entries got pruned", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + simulatePositions(t, logManager, 1, 2) + + require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + t.Run("entries consumed faster than applied", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + simulatePositions(t, logManager, 2, 0) + + require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + t.Run("acknowledge entries one by one", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + simulatePositions(t, logManager, 1, 1) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + simulatePositions(t, logManager, 2, 2) + + // The oldest LSN changes after each acknowledgement + require.Equal(t, [][]storage.LSN{{1, 1}, {2, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("append while consumer is busy with prior entries", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + simulatePositions(t, logManager, 0, 1) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + simulatePositions(t, logManager, 0, 2) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + simulatePositions(t, logManager, 3, 3) + + require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("acknowledged entries not pruned if not applied", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + + // 2 and 3 are not applied, hence kept intact. + simulatePositions(t, logManager, 3, 1) + + require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + + simulatePositions(t, logManager, 3, 3) + require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) +} diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index a5e23b694ff..53beba92869 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -964,9 +964,6 @@ type StateAssertion struct { // Repositories is the expected state of the repositories in the storage. The key is // the repository's relative path and the value describes its expected state. Repositories RepositoryStates - // Consumers is the expected state of the consumers and their position as tracked by - // the TransactionManager. - Consumers ConsumerState } // AdhocAssertion allows a test to add some custom assertions apart from the built-in assertions above. @@ -997,36 +994,6 @@ func (lc *MockLogConsumer) NotifyNewTransactions(storageName string, partitionID lc.highWaterMark = highWaterMark } -// ConsumerState is used to track the log positions received by the consumer and the corresponding -// acknowledgements from the consumer to the manager. We deliberately do not track the LowWaterMark -// sent to consumers as this is non-deterministic. -type ConsumerState struct { - // ManagerPosition is the last acknowledged LSN for the consumer as tracked by the TransactionManager. - ManagerPosition storage.LSN - // HighWaterMark is the latest high water mark received by the consumer from NotifyNewTransactions. - HighWaterMark storage.LSN -} - -// RequireConsumer asserts the consumer log position is correct. -func RequireConsumer(t *testing.T, consumer LogConsumer, consumerPos *position, expected ConsumerState) { - t.Helper() - - if consumerPos == nil { - return - } - - require.Equal(t, expected.ManagerPosition, consumerPos.getPosition(), "expected and actual manager position don't match") - - if consumer == nil { - return - } - - mock, ok := consumer.(*MockLogConsumer) - require.True(t, ok) - - require.Equal(t, expected.HighWaterMark, mock.highWaterMark, "expected and actual high water marks don't match") -} - // steps defines execution steps in a test. Each test case can define multiple steps to exercise // more complex behavior. type steps []any @@ -1575,8 +1542,6 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } } - RequireConsumer(t, transactionManager.logManager.consumer, transactionManager.logManager.positions[consumerPosition], tc.expectedState.Consumers) - testhelper.RequireDirectoryState(t, stateDir, "", expectedDirectory) expectedStagingDirState := testhelper.DirectoryState{ diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go index 2b5cbf375e4..17029b01082 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go @@ -89,10 +89,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 0, - HighWaterMark: 1, - }, }, }, { @@ -162,10 +158,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 1, - HighWaterMark: 1, - }, }, }, { @@ -256,10 +248,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 2, - HighWaterMark: 2, - }, }, }, { @@ -339,10 +327,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 1, - HighWaterMark: 1, - }, }, }, { @@ -469,10 +453,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 3, - HighWaterMark: 3, - }, }, }, { @@ -548,10 +528,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 1, - HighWaterMark: 1, - }, }, }, } -- GitLab From 2777dbd95cdcc2da61a768b6c5c3eef9386a81c5 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 25 Nov 2024 20:35:51 +0700 Subject: [PATCH 09/15] storage: Add accessor methods for LogManager Recently, TransactionManager accesses WAL's indices through internal variables. This commit creates methods for accessing those information. Those methods improve the isolation and shape the interface of log manager better. --- .../storagemgr/partition/log_manager.go | 41 ++++++++++++------- .../partition/transaction_manager.go | 16 ++++---- .../transaction_manager_hook_test.go | 2 +- .../partition/transaction_manager_test.go | 10 ++--- 4 files changed, 40 insertions(+), 29 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log_manager.go index b0d57367037..ef63674ff78 100644 --- a/internal/gitaly/storage/storagemgr/partition/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log_manager.go @@ -16,14 +16,14 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/safe" ) -// walStatePath returns the WAL directory's path. -func walStatePath(stateDir string) string { +// StatePath returns the WAL directory's path. +func StatePath(stateDir string) string { return filepath.Join(stateDir, "wal") } -// walFilesPathForLSN returns an absolute path to a given log entry's WAL files. -func walFilesPathForLSN(stateDir string, lsn storage.LSN) string { - return filepath.Join(walStatePath(stateDir), lsn.String()) +// EntryPath returns an absolute path to a given log entry's WAL files. +func EntryPath(stateDir string, lsn storage.LSN) string { + return filepath.Join(StatePath(stateDir), lsn.String()) } type positionType int @@ -55,7 +55,7 @@ func (p *position) setPosition(pos storage.LSN) { } type testLogHooks struct { - beforeAppendLogEntry func() + BeforeAppendLogEntry func() } // LogManager is responsible for managing the Write-Ahead Log (WAL) entries on disk. It maintains the in-memory state @@ -97,9 +97,9 @@ type LogManager struct { // notifyQueue is a queue notifying when there is a new change. notifyQueue chan struct{} - // testHooks are used in the tests to trigger logic at certain points in the execution. + // TestHooks are used in the tests to trigger logic at certain points in the execution. // They are used to synchronize more complex test scenarios. Not used in production. - testHooks testLogHooks + TestHooks testLogHooks } // NewLogManager returns an instance of LogManager. @@ -118,8 +118,8 @@ func NewLogManager(storageName string, partitionID storage.PartitionID, stagingD consumer: consumer, positions: positions, notifyQueue: make(chan struct{}, 1), - testHooks: testLogHooks{ - beforeAppendLogEntry: func() {}, + TestHooks: testLogHooks{ + BeforeAppendLogEntry: func() {}, }, } } @@ -148,7 +148,7 @@ func (mgr *LogManager) Initialize(ctx context.Context, appliedLSN storage.LSN) e // below to match. mgr.appendedLSN = appliedLSN - if logEntries, err := os.ReadDir(walStatePath(mgr.stateDirectory)); err != nil { + if logEntries, err := os.ReadDir(StatePath(mgr.stateDirectory)); err != nil { return fmt.Errorf("read wal directory: %w", err) } else if len(logEntries) > 0 { if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { @@ -190,6 +190,11 @@ func (mgr *LogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { } } +// StateDirectory returns the state directory under the management of this manager. +func (mgr *LogManager) StateDirectory() string { + return mgr.stateDirectory +} + // NotifyQueue returns a notify channel so that caller can poll new changes. func (mgr *LogManager) NotifyQueue() <-chan struct{} { return mgr.notifyQueue @@ -204,7 +209,7 @@ func (mgr *LogManager) AppendLogEntry(ctx context.Context, logEntryPath string) defer mgr.mutex.Unlock() nextLSN := mgr.appendedLSN + 1 - mgr.testHooks.beforeAppendLogEntry() + mgr.TestHooks.BeforeAppendLogEntry() // Move the log entry from the staging directory into its place in the log. destinationPath := mgr.GetEntryPath(nextLSN) @@ -298,9 +303,17 @@ func (mgr *LogManager) PruneLogEntries(ctx context.Context) error { return nil } +// AppendedLSN returns the index of latest appended log entry. +func (mgr *LogManager) AppendedLSN() storage.LSN { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + return mgr.appendedLSN +} + // GetEntryPath returns the path of the log entry's root directory. func (mgr *LogManager) GetEntryPath(lsn storage.LSN) string { - return walFilesPathForLSN(mgr.stateDirectory, lsn) + return EntryPath(mgr.stateDirectory, lsn) } // deleteLogEntry deletes the log entry at the given LSN from the log. @@ -312,7 +325,7 @@ func (mgr *LogManager) deleteLogEntry(ctx context.Context, lsn storage.LSN) erro return fmt.Errorf("mkdir temp: %w", err) } - logEntryPath := walFilesPathForLSN(mgr.stateDirectory, lsn) + logEntryPath := EntryPath(mgr.stateDirectory, lsn) // We can't delete a directory atomically as we have to first delete all of its content. // If the deletion was interrupted, we'd be left with a corrupted log entry on the disk. // To perform the deletion atomically, we move the to be deleted log entry out from the diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 458b81fb813..33aac987988 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -329,7 +329,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti txn := &Transaction{ write: opts.Write, commit: mgr.commit, - snapshotLSN: mgr.logManager.appendedLSN, + snapshotLSN: mgr.logManager.AppendedLSN(), finished: make(chan struct{}), relativePath: relativePath, metrics: mgr.metrics, @@ -2228,13 +2228,11 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { } for { - if mgr.appliedLSN < mgr.logManager.appendedLSN { + if mgr.appliedLSN < mgr.logManager.AppendedLSN() { lsn := mgr.appliedLSN + 1 - if err := mgr.applyLogEntry(ctx, lsn); err != nil { return fmt.Errorf("apply log entry: %w", err) } - continue } @@ -2398,7 +2396,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned // Commit the prepared transaction now that we've managed to commit the log entry. mgr.mutex.Lock() - preparedTX.Commit(ctx, mgr.logManager.appendedLSN) + preparedTX.Commit(ctx, mgr.logManager.AppendedLSN()) mgr.mutex.Unlock() return nil @@ -2407,7 +2405,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned return nil } - mgr.awaitingTransactions[mgr.logManager.appendedLSN] = transaction.result + mgr.awaitingTransactions[mgr.logManager.AppendedLSN()] = transaction.result return nil } @@ -2558,7 +2556,7 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { // Each unapplied log entry should have a snapshot lock as they are created in normal // operation when committing a log entry. Recover these entries. - for i := mgr.appliedLSN + 1; i <= mgr.logManager.appendedLSN; i++ { + for i := mgr.appliedLSN + 1; i <= mgr.logManager.AppendedLSN(); i++ { mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})} } @@ -3453,7 +3451,7 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende // Pre-setup an snapshot lock entry for the assumed appended LSN location. mgr.mutex.Lock() - mgr.snapshotLocks[mgr.logManager.appendedLSN+1] = &snapshotLock{applied: make(chan struct{})} + mgr.snapshotLocks[mgr.logManager.AppendedLSN()+1] = &snapshotLock{applied: make(chan struct{})} mgr.mutex.Unlock() // After this latch block, the transaction is committed and all subsequent transactions @@ -3461,7 +3459,7 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende appendedLSN, err := mgr.logManager.AppendLogEntry(ctx, logEntryPath) if err != nil { mgr.mutex.Lock() - delete(mgr.snapshotLocks, mgr.logManager.appendedLSN+1) + delete(mgr.snapshotLocks, mgr.logManager.AppendedLSN()+1) mgr.mutex.Unlock() return fmt.Errorf("append log entry: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index 15ec82f1e25..d15322bf055 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -28,7 +28,7 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, - &mgr.logManager.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, + &mgr.logManager.TestHooks.BeforeAppendLogEntry: hooks.BeforeAppendLogEntry, &mgr.testHooks.beforeRunExiting: func(hookContext) { if hooks.WaitForTransactionsWhenClosing { inflightTransactions.Wait() diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index e9de6ae064c..669ef5076b5 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -1401,7 +1401,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio // // The Manager starts up and we expect the pack file to be gone at the end of the test. ModifyStorage: func(_ testing.TB, _ config.Cfg, storagePath string) { - packFilePath := packFilePath(walFilesPathForLSN(filepath.Join(storagePath, setup.RelativePath), 1)) + packFilePath := packFilePath(EntryPath(filepath.Join(storagePath, setup.RelativePath), 1)) require.NoError(t, os.MkdirAll(filepath.Dir(packFilePath), mode.Directory)) require.NoError(t, os.WriteFile( packFilePath, @@ -2113,7 +2113,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) // Transaction 2 and 3 are left-over. - testhelper.RequireDirectoryState(t, tm.logManager.stateDirectory, "", + testhelper.RequireDirectoryState(t, tm.logManager.StateDirectory(), "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -2129,7 +2129,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(3)) - require.Equal(t, tm.logManager.appendedLSN, storage.LSN(3)) + require.Equal(t, tm.logManager.AppendedLSN(), storage.LSN(3)) }), }, expectedState: StateAssertion{ @@ -2263,7 +2263,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) // Transaction 2 and 3 are left-over. - testhelper.RequireDirectoryState(t, tm.logManager.stateDirectory, "", testhelper.DirectoryState{ + testhelper.RequireDirectoryState(t, tm.logManager.StateDirectory(), "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, "/wal/0000000000004": {Mode: mode.Directory}, @@ -2280,7 +2280,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(4).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(4)) - require.Equal(t, tm.logManager.appendedLSN, storage.LSN(4)) + require.Equal(t, tm.logManager.AppendedLSN(), storage.LSN(4)) }), }, expectedState: StateAssertion{ -- GitLab From 065157df95e724968705ba2d82b97783eec315fd Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 26 Nov 2024 17:52:37 +0700 Subject: [PATCH 10/15] storage: Move backup.LogManager and LogConsumer interfaces to one place Those interfaces are located in backup and storagemgr/partition packages. Those interfaces are used to communicate between storage and external users. Thus, it makes sense to unify them in one place. The backup package is importing some types from `storage` package. This commit moves them to the `storage` package. --- internal/backup/log_entry.go | 20 ++++------------ internal/cli/gitaly/serve.go | 2 +- internal/gitaly/storage/storage.go | 23 +++++++++++++++++++ .../storage/storagemgr/partition/factory.go | 17 ++------------ .../storagemgr/partition/log_manager.go | 4 ++-- .../storagemgr/partition/log_manager_test.go | 2 +- .../storagemgr/partition/testhelper_test.go | 2 +- .../partition/transaction_manager.go | 2 +- 8 files changed, 36 insertions(+), 36 deletions(-) diff --git a/internal/backup/log_entry.go b/internal/backup/log_entry.go index e569b09d63c..f3cf009e3b0 100644 --- a/internal/backup/log_entry.go +++ b/internal/backup/log_entry.go @@ -82,16 +82,6 @@ type PartitionInfo struct { PartitionID storage.PartitionID } -// LogManager is the interface used on the consumer side of the integration. The consumer -// has the ability to acknowledge transactions as having been processed with AcknowledgeTransaction. -type LogManager interface { - // AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed - // for the specified LogConsumer. - AcknowledgeTransaction(lsn storage.LSN) - // GetTransactionPath returns the path of the log entry's root directory. - GetTransactionPath(lsn storage.LSN) string -} - // LogEntryArchiver is used to backup applied log entries. It has a configurable number of // worker goroutines that will perform backups. Each partition may only have one backup // executing at a time, entries are always processed in-order. Backup failures will trigger @@ -304,7 +294,7 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { // We have already backed up all entries sent by the LogManager, but the manager is // not aware of this. Acknowledge again with our last processed entry. if state.nextLSN > notification.highWaterMark { - if err := la.callLogManager(ctx, notification.partitionInfo, func(lm LogManager) { + if err := la.callLogManager(ctx, notification.partitionInfo, func(lm storage.LogManager) { lm.AcknowledgeTransaction(state.nextLSN - 1) }); err != nil { la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for already completed entry") @@ -338,7 +328,7 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { } } -func (la *LogEntryArchiver) callLogManager(ctx context.Context, partitionInfo PartitionInfo, callback func(lm LogManager)) error { +func (la *LogEntryArchiver) callLogManager(ctx context.Context, partitionInfo PartitionInfo, callback func(lm storage.LogManager)) error { storageHandle, err := (*la.node).GetStorage(partitionInfo.StorageName) if err != nil { return fmt.Errorf("get storage: %w", err) @@ -350,7 +340,7 @@ func (la *LogEntryArchiver) callLogManager(ctx context.Context, partitionInfo Pa } defer partition.Close() - logManager, ok := partition.(LogManager) + logManager, ok := partition.(storage.LogManager) if !ok { return fmt.Errorf("expected LogManager, got %T", logManager) } @@ -391,7 +381,7 @@ func (la *LogEntryArchiver) receiveEntry(ctx context.Context, entry *logEntry) { la.waitDur = minRetryWait } - if err := la.callLogManager(ctx, entry.partitionInfo, func(lm LogManager) { + if err := la.callLogManager(ctx, entry.partitionInfo, func(lm storage.LogManager) { lm.AcknowledgeTransaction(entry.lsn) }); err != nil { la.logger.WithError(err).WithFields( @@ -420,7 +410,7 @@ func (la *LogEntryArchiver) processEntry(ctx context.Context, entry *logEntry) { }) var entryPath string - if err := la.callLogManager(context.Background(), entry.partitionInfo, func(lm LogManager) { + if err := la.callLogManager(context.Background(), entry.partitionInfo, func(lm storage.LogManager) { entryPath = lm.GetTransactionPath(entry.lsn) }); err != nil { la.backupCounter.WithLabelValues("fail").Add(1) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index f4a27a34386..5a17edf5e66 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -390,7 +390,7 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { } defer dbMgr.Close() - var logConsumer partition.LogConsumer + var logConsumer storage.LogConsumer if cfg.Backup.WALGoCloudURL != "" { walSink, err := backup.ResolveSink(ctx, cfg.Backup.WALGoCloudURL) if err != nil { diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index d79ade69066..9d40b0507a6 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -139,6 +139,29 @@ type BeginOptions struct { ForceExclusiveSnapshot bool } +// LogConsumer is the interface of a log consumer that is passed to a TransactionManager. +// The LogConsumer may perform read-only operations against the on-disk log entry. +// The TransactionManager notifies the consumer of new transactions by invoking the +// NotifyNewTransaction method after they are committed. +type LogConsumer interface { + // NotifyNewTransactions alerts the LogConsumer that new log entries are available for + // consumption. The method invoked both when the TransactionManager + // initializes and when new transactions are committed. Both the low and high water mark + // LSNs are sent so that a newly initialized consumer is aware of the full range of + // entries it can process. + NotifyNewTransactions(storageName string, partitionID PartitionID, lowWaterMark, highWaterMark LSN) +} + +// LogManager is the interface used on the consumer side of the integration. The consumer +// has the ability to acknowledge transactions as having been processed with AcknowledgeTransaction. +type LogManager interface { + // AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed + // for the specified LogConsumer. + AcknowledgeTransaction(lsn LSN) + // GetTransactionPath returns the path of the log entry's root directory. + GetTransactionPath(lsn LSN) string +} + // Partition is responsible for a single partition of data. type Partition interface { // Begin begins a transaction against the partition. diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index e2fabf936fd..7f85ce20aa8 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -12,25 +12,12 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) -// LogConsumer is the interface of a log consumer that is passed to a TransactionManager. -// The LogConsumer may perform read-only operations against the on-disk log entry. -// The TransactionManager notifies the consumer of new transactions by invoking the -// NotifyNewTransaction method after they are committed. -type LogConsumer interface { - // NotifyNewTransactions alerts the LogConsumer that new log entries are available for - // consumption. The method invoked both when the TransactionManager - // initializes and when new transactions are committed. Both the low and high water mark - // LSNs are sent so that a newly initialized consumer is aware of the full range of - // entries it can process. - NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) -} - // Factory is factory type that can create new partitions. type Factory struct { cmdFactory gitcmd.CommandFactory repoFactory localrepo.Factory metrics Metrics - logConsumer LogConsumer + logConsumer storage.LogConsumer } // New returns a new Partition instance. @@ -75,7 +62,7 @@ func NewFactory( cmdFactory gitcmd.CommandFactory, repoFactory localrepo.Factory, metrics Metrics, - logConsumer LogConsumer, + logConsumer storage.LogConsumer, ) Factory { return Factory{ cmdFactory: cmdFactory, diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log_manager.go index ef63674ff78..2eb2c0802f9 100644 --- a/internal/gitaly/storage/storagemgr/partition/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log_manager.go @@ -90,7 +90,7 @@ type LogManager struct { // consumer is an external caller that may perform read-only operations against applied log entries. Log entries // are retained until the consumer has acknowledged past their LSN. - consumer LogConsumer + consumer storage.LogConsumer // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL positions map[positionType]*position @@ -103,7 +103,7 @@ type LogManager struct { } // NewLogManager returns an instance of LogManager. -func NewLogManager(storageName string, partitionID storage.PartitionID, stagingDirectory string, stateDirectory string, consumer LogConsumer) *LogManager { +func NewLogManager(storageName string, partitionID storage.PartitionID, stagingDirectory string, stateDirectory string, consumer storage.LogConsumer) *LogManager { positions := map[positionType]*position{ appliedPosition: newPosition(), } diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log_manager_test.go index 6d5e5308a6c..4ec54305032 100644 --- a/internal/gitaly/storage/storagemgr/partition/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log_manager_test.go @@ -29,7 +29,7 @@ func appendLogEntry(t *testing.T, ctx context.Context, manager *LogManager, file return nextLSN } -func setupLogManager(t *testing.T, ctx context.Context, consumer LogConsumer) *LogManager { +func setupLogManager(t *testing.T, ctx context.Context, consumer storage.LogConsumer) *LogManager { logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer) require.NoError(t, logManager.Initialize(ctx, 0)) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 53beba92869..605b3030285 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -713,7 +713,7 @@ type testTransactionSetup struct { NonExistentOID git.ObjectID Commits testTransactionCommits AnnotatedTags []testTransactionTag - Consumer LogConsumer + Consumer storage.LogConsumer } type testTransactionHooks struct { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 33aac987988..3ffef6a13be 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -1046,7 +1046,7 @@ func NewTransactionManager( cmdFactory gitcmd.CommandFactory, repositoryFactory localrepo.StorageScopedFactory, metrics ManagerMetrics, - consumer LogConsumer, + consumer storage.LogConsumer, ) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) -- GitLab From a34493d25aebeef9cebd1326efc6187c5bb7f7bc Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 21 Nov 2024 20:57:32 +0700 Subject: [PATCH 11/15] storage: Rename methods of storage.LogManager and storage.LogConsumer Those interfaces use the term "transaction" in their method names. Using that term was perfectly fine before since the backup package interacts directly with TransactionManager through those interfaces. In some next commits, the backup package will interact with the log manager directly without going through TransactionManager. It will, in fact, deal with log entries. Transactions are not relevant anymore. This commit renames methods using the term "transaction" in its name in favor of the new log entry terminology. --- internal/backup/log_entry.go | 14 +++++++------- internal/backup/log_entry_test.go | 6 +++--- internal/gitaly/storage/storage.go | 14 +++++++------- .../storage/storagemgr/partition/log_manager.go | 4 ++-- .../storagemgr/partition/log_manager_test.go | 2 +- .../storagemgr/partition/testhelper_test.go | 6 +++--- .../storagemgr/partition/transaction_manager.go | 11 +++++------ 7 files changed, 28 insertions(+), 29 deletions(-) diff --git a/internal/backup/log_entry.go b/internal/backup/log_entry.go index f3cf009e3b0..a3b58d75e71 100644 --- a/internal/backup/log_entry.go +++ b/internal/backup/log_entry.go @@ -39,7 +39,7 @@ func newLogEntry(partitionInfo PartitionInfo, lsn storage.LSN) *logEntry { } } -// partitionNotification is used to store the data received by NotifyNewTransactions. +// partitionNotification is used to store the data received by NotifyNewEntries. type partitionNotification struct { lowWaterMark storage.LSN highWaterMark storage.LSN @@ -172,8 +172,8 @@ func newLogEntryArchiver(logger log.Logger, archiveSink *Sink, workerCount uint, return archiver } -// NotifyNewTransactions passes the transaction information to the LogEntryArchiver for processing. -func (la *LogEntryArchiver) NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) { +// NotifyNewEntries passes the log entry information to the LogEntryArchiver for processing. +func (la *LogEntryArchiver) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) { la.notificationsMutex.Lock() defer la.notificationsMutex.Unlock() @@ -295,7 +295,7 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { // not aware of this. Acknowledge again with our last processed entry. if state.nextLSN > notification.highWaterMark { if err := la.callLogManager(ctx, notification.partitionInfo, func(lm storage.LogManager) { - lm.AcknowledgeTransaction(state.nextLSN - 1) + lm.AcknowledgeConsumerPosition(state.nextLSN - 1) }); err != nil { la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for already completed entry") } @@ -382,7 +382,7 @@ func (la *LogEntryArchiver) receiveEntry(ctx context.Context, entry *logEntry) { } if err := la.callLogManager(ctx, entry.partitionInfo, func(lm storage.LogManager) { - lm.AcknowledgeTransaction(entry.lsn) + lm.AcknowledgeConsumerPosition(entry.lsn) }); err != nil { la.logger.WithError(err).WithFields( log.Fields{ @@ -411,7 +411,7 @@ func (la *LogEntryArchiver) processEntry(ctx context.Context, entry *logEntry) { var entryPath string if err := la.callLogManager(context.Background(), entry.partitionInfo, func(lm storage.LogManager) { - entryPath = lm.GetTransactionPath(entry.lsn) + entryPath = lm.GetEntryPath(entry.lsn) }); err != nil { la.backupCounter.WithLabelValues("fail").Add(1) la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for entry path") @@ -441,7 +441,7 @@ func (la *LogEntryArchiver) processEntry(ctx context.Context, entry *logEntry) { entry.success = true } -// backupLogEntry tar's the root directory of the transaction and writes it to the Sink. +// backupLogEntry tar's the root directory of the log entry and writes it to the Sink. func (la *LogEntryArchiver) backupLogEntry(ctx context.Context, partitionInfo PartitionInfo, lsn storage.LSN, entryPath string) (returnErr error) { timer := prometheus.NewTimer(la.backupLatency) defer timer.ObserveDuration() diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index b32534d9f85..b68dbe11236 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -66,7 +66,7 @@ type mockLogManager struct { func (lm *mockLogManager) Close() {} -func (lm *mockLogManager) AcknowledgeTransaction(lsn storage.LSN) { +func (lm *mockLogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { lm.Lock() defer lm.Unlock() @@ -103,12 +103,12 @@ func (lm *mockLogManager) AcknowledgeTransaction(lsn storage.LSN) { func (lm *mockLogManager) SendNotification() { n := lm.notifications[0] - lm.archiver.NotifyNewTransactions(lm.partitionInfo.StorageName, lm.partitionInfo.PartitionID, n.lowWaterMark, n.highWaterMark) + lm.archiver.NotifyNewEntries(lm.partitionInfo.StorageName, lm.partitionInfo.PartitionID, n.lowWaterMark, n.highWaterMark) lm.notifications = lm.notifications[1:] } -func (lm *mockLogManager) GetTransactionPath(lsn storage.LSN) string { +func (lm *mockLogManager) GetEntryPath(lsn storage.LSN) string { return filepath.Join(partitionPath(lm.entryRootPath, lm.partitionInfo), lsn.String()) } diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 9d40b0507a6..e41c24f3d55 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -144,22 +144,22 @@ type BeginOptions struct { // The TransactionManager notifies the consumer of new transactions by invoking the // NotifyNewTransaction method after they are committed. type LogConsumer interface { - // NotifyNewTransactions alerts the LogConsumer that new log entries are available for + // NotifyNewEntries alerts the LogConsumer that new log entries are available for // consumption. The method invoked both when the TransactionManager // initializes and when new transactions are committed. Both the low and high water mark // LSNs are sent so that a newly initialized consumer is aware of the full range of // entries it can process. - NotifyNewTransactions(storageName string, partitionID PartitionID, lowWaterMark, highWaterMark LSN) + NotifyNewEntries(storageName string, partitionID PartitionID, lowWaterMark, highWaterMark LSN) } // LogManager is the interface used on the consumer side of the integration. The consumer -// has the ability to acknowledge transactions as having been processed with AcknowledgeTransaction. +// has the ability to acknowledge transactions as having been processed with AcknowledgeConsumerPosition. type LogManager interface { - // AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed + // AcknowledgeConsumerPosition acknowledges log entries up and including lsn as successfully processed // for the specified LogConsumer. - AcknowledgeTransaction(lsn LSN) - // GetTransactionPath returns the path of the log entry's root directory. - GetTransactionPath(lsn LSN) string + AcknowledgeConsumerPosition(lsn LSN) + // GetEntryPath returns the path of the log entry's root directory. + GetEntryPath(lsn LSN) string } // Partition is responsible for a single partition of data. diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log_manager.go index 2eb2c0802f9..51f2d59eb82 100644 --- a/internal/gitaly/storage/storagemgr/partition/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log_manager.go @@ -163,7 +163,7 @@ func (mgr *LogManager) Initialize(ctx context.Context, appliedLSN storage.LSN) e // Set acknowledged position to oldestLSN - 1 and notify the consumer from oldestLSN -> appendedLSN. // If set the position to 0, the consumer is unable to read pruned entry anyway. mgr.AcknowledgeConsumerPosition(mgr.oldestLSN - 1) - mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) + mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) } mgr.AcknowledgeAppliedPosition(appliedLSN) @@ -232,7 +232,7 @@ func (mgr *LogManager) AppendLogEntry(ctx context.Context, logEntryPath string) mgr.appendedLSN = nextLSN if mgr.consumer != nil { - mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) + mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) } return nextLSN, nil } diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log_manager_test.go index 4ec54305032..852bace69ad 100644 --- a/internal/gitaly/storage/storagemgr/partition/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log_manager_test.go @@ -359,7 +359,7 @@ type mockLogConsumer struct { positions [][]storage.LSN } -func (c *mockLogConsumer) NotifyNewTransactions(storageName string, partitionID storage.PartitionID, oldestLSN, appendedLSN storage.LSN) { +func (c *mockLogConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, oldestLSN, appendedLSN storage.LSN) { c.mu.Lock() defer c.mu.Unlock() diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 605b3030285..13e0ae97e07 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -936,7 +936,7 @@ type Prune struct { ExpectedObjects []git.ObjectID } -// ConsumerAcknowledge calls AcknowledgeTransaction for all consumers. +// ConsumerAcknowledge calls AcknowledgeConsumerPosition for all consumers. type ConsumerAcknowledge struct { // LSN is the LSN acknowledged by the consumers. LSN storage.LSN @@ -990,7 +990,7 @@ type MockLogConsumer struct { highWaterMark storage.LSN } -func (lc *MockLogConsumer) NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) { +func (lc *MockLogConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) { lc.highWaterMark = highWaterMark } @@ -1424,7 +1424,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction := openTransactions[step.TransactionID] transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: - transactionManager.AcknowledgeTransaction(step.LSN) + transactionManager.AcknowledgeConsumerPosition(step.LSN) case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 3ffef6a13be..6d525da3571 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -889,15 +889,14 @@ type committedEntry struct { objectDependencies map[git.ObjectID]struct{} } -// AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed -// for the specified LogConsumer. The manager is awakened if it is currently awaiting a new or -// completed transaction. -func (mgr *TransactionManager) AcknowledgeTransaction(lsn storage.LSN) { +// AcknowledgeConsumerPosition acknowledges log entries up and including lsn as successfully processed for the specified +// LogConsumer. The manager is awakened if it is currently awaiting a new or completed transaction. +func (mgr *TransactionManager) AcknowledgeConsumerPosition(lsn storage.LSN) { mgr.logManager.AcknowledgeConsumerPosition(lsn) } -// GetTransactionPath returns the path of the log entry's root directory. -func (mgr *TransactionManager) GetTransactionPath(lsn storage.LSN) string { +// GetEntryPath returns the path of the log entry's root directory. +func (mgr *TransactionManager) GetEntryPath(lsn storage.LSN) string { return mgr.logManager.GetEntryPath(lsn) } -- GitLab From 47f37a1bcea1afc5f3985ac7c7ff52135dfdc5d6 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 21 Nov 2024 21:06:43 +0700 Subject: [PATCH 12/15] backup: Let backup work directly with underlying LogManager In prior commits, the log management functionality is extracted out of TransactionManager. The backup functionality followed this extraction. The new LogManager is now responsible for tracking all indices as well as consumer positions. However, the backup package still contacts the TransactionManager for acknowledgement. This commit breaks up this relationship. The backup now works directly with the new LogManager instead of detouring through the TransactionManager. --- internal/backup/log_entry.go | 7 +--- internal/backup/log_entry_test.go | 33 ++++++++++++------- internal/gitaly/storage/storage.go | 3 ++ .../storagemgr/partition/testhelper_test.go | 2 +- .../partition/transaction_manager.go | 13 +++----- 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/internal/backup/log_entry.go b/internal/backup/log_entry.go index a3b58d75e71..91b82c7ed26 100644 --- a/internal/backup/log_entry.go +++ b/internal/backup/log_entry.go @@ -340,12 +340,7 @@ func (la *LogEntryArchiver) callLogManager(ctx context.Context, partitionInfo Pa } defer partition.Close() - logManager, ok := partition.(storage.LogManager) - if !ok { - return fmt.Errorf("expected LogManager, got %T", logManager) - } - - callback(logManager) + callback(partition.GetLogManager()) return nil } diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index b68dbe11236..18d4d152d01 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -22,9 +22,20 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) +type mockPartition struct { + storage.Partition + manager storage.LogManager +} + +func (p *mockPartition) Close() {} + +func (p *mockPartition) GetLogManager() storage.LogManager { + return p.manager +} + type mockNode struct { - managers map[PartitionInfo]*mockLogManager - t *testing.T + partitions map[PartitionInfo]*mockPartition + t *testing.T sync.Mutex } @@ -44,7 +55,7 @@ func (m mockStorage) GetPartition(ctx context.Context, partitionID storage.Parti defer m.node.Unlock() info := PartitionInfo{m.storageName, partitionID} - mgr, ok := m.node.managers[info] + mgr, ok := m.node.partitions[info] assert.True(m.node.t, ok) return mgr, nil @@ -61,7 +72,6 @@ type mockLogManager struct { finishCount int sync.Mutex - storage.Partition } func (lm *mockLogManager) Close() {} @@ -295,8 +305,8 @@ func TestLogEntryArchiver(t *testing.T) { var wg sync.WaitGroup accessor := &mockNode{ - managers: make(map[PartitionInfo]*mockLogManager, len(tc.partitions)), - t: t, + partitions: make(map[PartitionInfo]*mockPartition, len(tc.partitions)), + t: t, } node := storage.Node(accessor) @@ -330,7 +340,7 @@ func TestLogEntryArchiver(t *testing.T) { managers[info] = manager accessor.Lock() - accessor.managers[info] = manager + accessor.partitions[info] = &mockPartition{manager: manager} accessor.Unlock() // Send partitions in parallel to mimic real usage. @@ -358,7 +368,8 @@ func TestLogEntryArchiver(t *testing.T) { cmpDir := testhelper.TempDir(t) require.NoError(t, os.Mkdir(filepath.Join(cmpDir, storageName), mode.Directory)) - for info, manager := range accessor.managers { + for info, partition := range accessor.partitions { + manager := partition.GetLogManager().(*mockLogManager) lastAck := manager.acknowledged[len(manager.acknowledged)-1] require.Equal(t, tc.finalLSN, lastAck) @@ -417,8 +428,8 @@ func TestLogEntryArchiver_retry(t *testing.T) { require.NoError(t, err) accessor := &mockNode{ - managers: make(map[PartitionInfo]*mockLogManager, 1), - t: t, + partitions: make(map[PartitionInfo]*mockPartition, 1), + t: t, } node := storage.Node(accessor) @@ -447,7 +458,7 @@ func TestLogEntryArchiver_retry(t *testing.T) { } accessor.Lock() - accessor.managers[info] = manager + accessor.partitions[info] = &mockPartition{manager: manager} accessor.Unlock() wg.Add(1) diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index e41c24f3d55..9e0432f097d 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -168,6 +168,9 @@ type Partition interface { Begin(context.Context, BeginOptions) (Transaction, error) // Close closes the partition handle to signal the caller is done using it. Close() + // GetLogManager provides controlled access to underlying log management system for log consumption purpose. It + // allows the consumers to access to on-disk location of a LSN and acknowledge consumed position. + GetLogManager() LogManager } // TransactionOptions are used to pass transaction options into Begin. diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 13e0ae97e07..39c0cfc437d 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -1424,7 +1424,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction := openTransactions[step.TransactionID] transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: - transactionManager.AcknowledgeConsumerPosition(step.LSN) + transactionManager.logManager.AcknowledgeConsumerPosition(step.LSN) case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 6d525da3571..cbd986cd78f 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -889,15 +889,10 @@ type committedEntry struct { objectDependencies map[git.ObjectID]struct{} } -// AcknowledgeConsumerPosition acknowledges log entries up and including lsn as successfully processed for the specified -// LogConsumer. The manager is awakened if it is currently awaiting a new or completed transaction. -func (mgr *TransactionManager) AcknowledgeConsumerPosition(lsn storage.LSN) { - mgr.logManager.AcknowledgeConsumerPosition(lsn) -} - -// GetEntryPath returns the path of the log entry's root directory. -func (mgr *TransactionManager) GetEntryPath(lsn storage.LSN) string { - return mgr.logManager.GetEntryPath(lsn) +// GetLogManager provides controlled access to underlying log management system for log consumption purpose. It +// allows the consumers to access to on-disk location of a LSN and acknowledge consumed position. +func (mgr *TransactionManager) GetLogManager() storage.LogManager { + return mgr.logManager } // TransactionManager is responsible for transaction management of a single repository. Each repository has -- GitLab From b83c77b77c17d21b2c848594fcd534455fdba865 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 21 Nov 2024 21:14:22 +0700 Subject: [PATCH 13/15] storage: Move LogManager to "storage/storagemgr/partition/log" package This commit relocates LogManager to the "storage/storagemgr/partition/log" package, reinforcing encapsulation and logical grouping of WAL functionalities. The move follows previous refinements that improved its interface and test coverage. With this change, we enhance code organization, readability, and maintainability, paving the way for future enhancements without impacting other system components. --- .../storagemgr/partition/{ => log}/log_manager.go | 2 +- .../partition/{ => log}/log_manager_test.go | 2 +- .../storagemgr/partition/log/testhelper_test.go | 11 +++++++++++ .../storagemgr/partition/transaction_manager.go | 11 ++++++----- .../storagemgr/partition/transaction_manager_test.go | 3 ++- 5 files changed, 21 insertions(+), 8 deletions(-) rename internal/gitaly/storage/storagemgr/partition/{ => log}/log_manager.go (99%) rename internal/gitaly/storage/storagemgr/partition/{ => log}/log_manager_test.go (99%) create mode 100644 internal/gitaly/storage/storagemgr/partition/log/testhelper_test.go diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go similarity index 99% rename from internal/gitaly/storage/storagemgr/partition/log_manager.go rename to internal/gitaly/storage/storagemgr/partition/log/log_manager.go index 51f2d59eb82..341f24a5153 100644 --- a/internal/gitaly/storage/storagemgr/partition/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -1,4 +1,4 @@ -package partition +package log import ( "context" diff --git a/internal/gitaly/storage/storagemgr/partition/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go similarity index 99% rename from internal/gitaly/storage/storagemgr/partition/log_manager_test.go rename to internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 852bace69ad..b2f14c3f7ff 100644 --- a/internal/gitaly/storage/storagemgr/partition/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -1,4 +1,4 @@ -package partition +package log import ( "context" diff --git a/internal/gitaly/storage/storagemgr/partition/log/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/log/testhelper_test.go new file mode 100644 index 00000000000..62ab82a8536 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log/testhelper_test.go @@ -0,0 +1,11 @@ +package log + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index cbd986cd78f..3aaeb42210d 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -37,10 +37,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal/reftree" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" + logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/tracing" @@ -938,7 +939,7 @@ type TransactionManager struct { // close cancels ctx and stops the transaction processing. close context.CancelFunc // logger is the logger to use to write log messages. - logger log.Logger + logger logging.Logger // closing is closed when close is called. It unblock transactions that are waiting to be admitted. closing <-chan struct{} @@ -965,7 +966,7 @@ type TransactionManager struct { // db is the handle to the key-value store used for storing the write-ahead log related state. db keyvalue.Transactioner // logManager manages the underlying Write-Ahead Log entries. - logManager *LogManager + logManager *log.LogManager // admissionQueue is where the incoming writes are waiting to be admitted to the transaction // manager. admissionQueue chan *Transaction @@ -1031,7 +1032,7 @@ type testHooks struct { // NewTransactionManager returns a new TransactionManager for the given repository. func NewTransactionManager( ptnID storage.PartitionID, - logger log.Logger, + logger logging.Logger, db keyvalue.Transactioner, storageName, storagePath, @@ -1059,7 +1060,7 @@ func NewTransactionManager( storagePath: storagePath, partitionID: ptnID, db: db, - logManager: NewLogManager(storageName, ptnID, stagingDir, stateDir, consumer), + logManager: log.NewLogManager(storageName, ptnID, stagingDir, stateDir, consumer), admissionQueue: make(chan *Transaction), completedQueue: make(chan struct{}, 1), initialized: make(chan struct{}), diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index 669ef5076b5..eccf30f3f62 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -29,6 +29,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/refdb" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -1401,7 +1402,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio // // The Manager starts up and we expect the pack file to be gone at the end of the test. ModifyStorage: func(_ testing.TB, _ config.Cfg, storagePath string) { - packFilePath := packFilePath(EntryPath(filepath.Join(storagePath, setup.RelativePath), 1)) + packFilePath := packFilePath(log.EntryPath(filepath.Join(storagePath, setup.RelativePath), 1)) require.NoError(t, os.MkdirAll(filepath.Dir(packFilePath), mode.Directory)) require.NoError(t, os.WriteFile( packFilePath, -- GitLab From 2a29549bb8c927cc94cdf9aa1f9745b08fa46912 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 26 Nov 2024 17:55:10 +0700 Subject: [PATCH 14/15] storage: Rename classes with repetitive package name In a previous commit, LogManager was moved to "log" package. Because the new name of the package is "log", calling "log.LogManager" is redundant. That class and corresponding initializer should be renamed to Manager so that the caller can use "log.Manager" and "log.NewManager". --- .../storagemgr/partition/log/log_manager.go | 40 +++++++++---------- .../partition/log/log_manager_test.go | 27 +++++++------ .../partition/transaction_manager.go | 4 +- 3 files changed, 36 insertions(+), 35 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index 341f24a5153..dbae71e2eed 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -58,20 +58,20 @@ type testLogHooks struct { BeforeAppendLogEntry func() } -// LogManager is responsible for managing the Write-Ahead Log (WAL) entries on disk. It maintains the in-memory state -// and indexing system that reflect the functional state of the WAL. The LogManager ensures safe and consistent +// Manager is responsible for managing the Write-Ahead Log (WAL) entries on disk. It maintains the in-memory state +// and indexing system that reflect the functional state of the WAL. The Manager ensures safe and consistent // proposals, applications, and prunings of log entries, acting as the interface for transactional log operations. It // coordinates with LogConsumer to allow safe consumption of log entries while handling retention and cleanup based on // references and acknowledgements. It effectively abstracts WAL operations from the TransactionManager, contributing to // a cleaner separation of concerns and making the system more maintainable and extensible. -type LogManager struct { +type Manager struct { // mutex protects access to critical states, especially `oldestLSN` and `appendedLSN`, as well as the integrity // of inflight log entries. Since indices are monotonic, two parallel log appending operations result in pushing // files into the same directory and breaking the manifest file. Thus, Parallel log entry appending and pruning // are not supported. mutex sync.Mutex - // storageName is the name of the storage the LogManager's partition is a member of. + // storageName is the name of the storage the Manager's partition is a member of. storageName string // storage.PartitionID is the ID of the partition this manager is operating on. partitionID storage.PartitionID @@ -102,15 +102,15 @@ type LogManager struct { TestHooks testLogHooks } -// NewLogManager returns an instance of LogManager. -func NewLogManager(storageName string, partitionID storage.PartitionID, stagingDirectory string, stateDirectory string, consumer storage.LogConsumer) *LogManager { +// NewManager returns an instance of Manager. +func NewManager(storageName string, partitionID storage.PartitionID, stagingDirectory string, stateDirectory string, consumer storage.LogConsumer) *Manager { positions := map[positionType]*position{ appliedPosition: newPosition(), } if consumer != nil { positions[consumerPosition] = newPosition() } - return &LogManager{ + return &Manager{ storageName: storageName, partitionID: partitionID, tmpDirectory: stagingDirectory, @@ -124,14 +124,14 @@ func NewLogManager(storageName string, partitionID storage.PartitionID, stagingD } } -// Initialize sets up the initial state of the LogManager, preparing it to manage the write-ahead log entries. It reads +// Initialize sets up the initial state of the Manager, preparing it to manage the write-ahead log entries. It reads // the last applied LSN from the database to resume from where it left off, creates necessary directories, and // initializes in-memory tracking variables such as appendedLSN and oldestLSN based on the files present in the WAL // directory. This method also removes any stale log files that may have been left due to interrupted operations, // ensuring the WAL directory only contains valid log entries. If a LogConsumer is present, it notifies it of the // initial log entry state, enabling consumers to start processing from the correct point. Proper initialization is // crucial for maintaining data consistency and ensuring that log entries are managed accurately upon system startup. -func (mgr *LogManager) Initialize(ctx context.Context, appliedLSN storage.LSN) error { +func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) error { if err := mgr.createStateDirectory(ctx); err != nil { return fmt.Errorf("create state directory: %w", err) } @@ -171,13 +171,13 @@ func (mgr *LogManager) Initialize(ctx context.Context, appliedLSN storage.LSN) e } // AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. -func (mgr *LogManager) AcknowledgeAppliedPosition(lsn storage.LSN) { +func (mgr *Manager) AcknowledgeAppliedPosition(lsn storage.LSN) { mgr.positions[appliedPosition].setPosition(lsn) } // AcknowledgeConsumerPosition acknowledges log entries up and including LSN as successfully processed for the specified // LogConsumer. The manager is awakened if it is currently awaiting a new or completed transaction. -func (mgr *LogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { +func (mgr *Manager) AcknowledgeConsumerPosition(lsn storage.LSN) { if mgr.consumer == nil { panic("log manager's consumer must be present prior to AcknowledgeConsumerPos call") } @@ -191,12 +191,12 @@ func (mgr *LogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { } // StateDirectory returns the state directory under the management of this manager. -func (mgr *LogManager) StateDirectory() string { +func (mgr *Manager) StateDirectory() string { return mgr.stateDirectory } // NotifyQueue returns a notify channel so that caller can poll new changes. -func (mgr *LogManager) NotifyQueue() <-chan struct{} { +func (mgr *Manager) NotifyQueue() <-chan struct{} { return mgr.notifyQueue } @@ -204,7 +204,7 @@ func (mgr *LogManager) NotifyQueue() <-chan struct{} { // absolute path to the directory that represents the log entry. appendLogEntry // moves the log entry's directory to the WAL, and returns its LSN once it has // been committed to the log. -func (mgr *LogManager) AppendLogEntry(ctx context.Context, logEntryPath string) (storage.LSN, error) { +func (mgr *Manager) AppendLogEntry(ctx context.Context, logEntryPath string) (storage.LSN, error) { mgr.mutex.Lock() defer mgr.mutex.Unlock() @@ -237,7 +237,7 @@ func (mgr *LogManager) AppendLogEntry(ctx context.Context, logEntryPath string) return nextLSN, nil } -func (mgr *LogManager) createStateDirectory(ctx context.Context) error { +func (mgr *Manager) createStateDirectory(ctx context.Context) error { needsFsync := false for _, path := range []string{ mgr.stateDirectory, @@ -276,7 +276,7 @@ func (mgr *LogManager) createStateDirectory(ctx context.Context) error { // needed. It ensures efficient storage management by removing redundant entries while maintaining the integrity of the // log sequence. The method respects the established low-water mark, ensuring no entries that might still be required // for transaction consistency are deleted. -func (mgr *LogManager) PruneLogEntries(ctx context.Context) error { +func (mgr *Manager) PruneLogEntries(ctx context.Context) error { mgr.mutex.Lock() defer mgr.mutex.Unlock() @@ -304,7 +304,7 @@ func (mgr *LogManager) PruneLogEntries(ctx context.Context) error { } // AppendedLSN returns the index of latest appended log entry. -func (mgr *LogManager) AppendedLSN() storage.LSN { +func (mgr *Manager) AppendedLSN() storage.LSN { mgr.mutex.Lock() defer mgr.mutex.Unlock() @@ -312,12 +312,12 @@ func (mgr *LogManager) AppendedLSN() storage.LSN { } // GetEntryPath returns the path of the log entry's root directory. -func (mgr *LogManager) GetEntryPath(lsn storage.LSN) string { +func (mgr *Manager) GetEntryPath(lsn storage.LSN) string { return EntryPath(mgr.stateDirectory, lsn) } // deleteLogEntry deletes the log entry at the given LSN from the log. -func (mgr *LogManager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { +func (mgr *Manager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { defer trace.StartRegion(ctx, "deleteLogEntry").End() tmpDir, err := os.MkdirTemp(mgr.tmpDirectory, "") @@ -352,7 +352,7 @@ func (mgr *LogManager) deleteLogEntry(ctx context.Context, lsn storage.LSN) erro // lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than // this mark are removed. -func (mgr *LogManager) lowWaterMark() storage.LSN { +func (mgr *Manager) lowWaterMark() storage.LSN { minAcknowledged := mgr.appendedLSN + 1 // Position is the last acknowledged LSN, this is eligible for pruning. diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index b2f14c3f7ff..27ca31a57e1 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -14,7 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) -func appendLogEntry(t *testing.T, ctx context.Context, manager *LogManager, files map[string][]byte) storage.LSN { +func appendLogEntry(t *testing.T, ctx context.Context, manager *Manager, files map[string][]byte) storage.LSN { t.Helper() logEntryPath := testhelper.TempDir(t) @@ -29,8 +29,8 @@ func appendLogEntry(t *testing.T, ctx context.Context, manager *LogManager, file return nextLSN } -func setupLogManager(t *testing.T, ctx context.Context, consumer storage.LogConsumer) *LogManager { - logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer) +func setupLogManager(t *testing.T, ctx context.Context, consumer storage.LogConsumer) *Manager { + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer) require.NoError(t, logManager.Initialize(ctx, 0)) return logManager @@ -44,7 +44,7 @@ func TestLogManager_Initialize(t *testing.T) { ctx := testhelper.Context(t) stateDir := testhelper.TempDir(t) - logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) require.Equal(t, storage.LSN(1), logManager.oldestLSN) @@ -61,13 +61,13 @@ func TestLogManager_Initialize(t *testing.T) { ctx := testhelper.Context(t) stateDir := testhelper.TempDir(t) - logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) - logManager = NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) require.Equal(t, storage.LSN(1), logManager.oldestLSN) require.Equal(t, storage.LSN(2), logManager.appendedLSN) @@ -88,14 +88,14 @@ func TestLogManager_Initialize(t *testing.T) { ctx := testhelper.Context(t) stateDir := testhelper.TempDir(t) - logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.createStateDirectory(ctx)) for i := 0; i < 3; i++ { appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } - logManager = NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 2)) require.Equal(t, storage.LSN(1), logManager.oldestLSN) @@ -118,14 +118,15 @@ func TestLogManager_Initialize(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) stateDir := testhelper.TempDir(t) - logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } - logManager = NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) require.NoError(t, logManager.Initialize(ctx, 3)) require.Equal(t, storage.LSN(1), logManager.oldestLSN) @@ -369,7 +370,7 @@ func (c *mockLogConsumer) NotifyNewEntries(storageName string, partitionID stora func TestLogManager_Positions(t *testing.T) { ctx := testhelper.Context(t) - simulatePositions := func(t *testing.T, logManager *LogManager, consumed storage.LSN, applied storage.LSN) { + simulatePositions := func(t *testing.T, logManager *Manager, consumed storage.LSN, applied storage.LSN) { logManager.AcknowledgeConsumerPosition(consumed) logManager.AcknowledgeAppliedPosition(applied) require.NoError(t, logManager.PruneLogEntries(ctx)) @@ -394,7 +395,7 @@ func TestLogManager_Positions(t *testing.T) { // Before restart mockConsumer := &mockLogConsumer{} - logManager := NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) require.NoError(t, logManager.Initialize(ctx, 0)) appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) @@ -422,7 +423,7 @@ func TestLogManager_Positions(t *testing.T) { // Restart the log consumer. mockConsumer = &mockLogConsumer{} - logManager = NewLogManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) require.NoError(t, logManager.Initialize(ctx, 2)) // Notify consumer to consume from 2 -> 4 diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 3aaeb42210d..6ef6c7be45b 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -966,7 +966,7 @@ type TransactionManager struct { // db is the handle to the key-value store used for storing the write-ahead log related state. db keyvalue.Transactioner // logManager manages the underlying Write-Ahead Log entries. - logManager *log.LogManager + logManager *log.Manager // admissionQueue is where the incoming writes are waiting to be admitted to the transaction // manager. admissionQueue chan *Transaction @@ -1060,7 +1060,7 @@ func NewTransactionManager( storagePath: storagePath, partitionID: ptnID, db: db, - logManager: log.NewLogManager(storageName, ptnID, stagingDir, stateDir, consumer), + logManager: log.NewManager(storageName, ptnID, stagingDir, stateDir, consumer), admissionQueue: make(chan *Transaction), completedQueue: make(chan struct{}, 1), initialized: make(chan struct{}), -- GitLab From be8cdecf76ada50629ed68053aa166f6f42f4621 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 19 Nov 2024 20:49:13 +0700 Subject: [PATCH 15/15] backup: Add an integration test that uses real WAL log manager In the tests of the internal/backup package, we mock the log manager extensively. It was because it's not trivial to set a Transaction The manager inside this package. Using mock comes with a cost. We noticed that the type assertion in the implementation never passed outside of tests. Now the log management is detached from Transaction Manager. It makes more sense to write an integration test so that the log archiver uses a real WAL log manager. This test eliminates the aforementioned problem. --- internal/backup/log_entry_test.go | 122 ++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index 18d4d152d01..91eaf2059ed 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/archive" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) @@ -573,3 +574,124 @@ func buildMetrics(t *testing.T, successCt, failCt int) string { return builder.String() } + +// TestLogEntryArchiver_WithRealLogManager runs the log archiver with a real WAL log manager. This test aims to assert +// the flow between the two components to avoid uncaught errors due to mocking. Test setup and verification are +// extremely verbose. There's no reliable way to simulate more sophisticated scenarios because the test could not +// intercept internal states of log archiver or log manager. The log manager doesn't expose the current consumer +// position. Thus, this test verifies the most basic archiving. Unit tests using mocking will cover the rest. +func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + // Setup node + accessor := &mockNode{ + partitions: make(map[PartitionInfo]*mockPartition), + t: t, + } + node := storage.Node(accessor) + + // Setup archiver + archivePath := testhelper.TempDir(t) + archiveSink, err := ResolveSink(ctx, archivePath) + require.NoError(t, err) + + archiver := NewLogEntryArchiver(testhelper.NewLogger(t), archiveSink, 1, &node) + archiver.Run() + + // Setup WAL log manager and plug it to storage + const storageName = "default" + var logManagers []*log.Manager + for i := 1; i <= 2; i++ { + info := PartitionInfo{ + StorageName: storageName, + PartitionID: storage.PartitionID(i), + } + + logManager := log.NewManager(storageName, storage.PartitionID(i), testhelper.TempDir(t), testhelper.TempDir(t), archiver) + require.NoError(t, logManager.Initialize(ctx, 0)) + + accessor.Lock() + accessor.partitions[info] = &mockPartition{manager: logManager} + accessor.Unlock() + + logManagers = append(logManagers, logManager) + } + + appendLogEntry(t, ctx, logManagers[0], map[string][]byte{ + "1": []byte("content-1"), + "2": []byte("content-2"), + "3": []byte("content-3"), + }) + <-logManagers[0].NotifyQueue() + + // KV operation without any file. + appendLogEntry(t, ctx, logManagers[1], map[string][]byte{}) + <-logManagers[1].NotifyQueue() + + appendLogEntry(t, ctx, logManagers[1], map[string][]byte{ + "4": []byte("content-4"), + "5": []byte("content-5"), + "6": []byte("content-6"), + }) + <-logManagers[1].NotifyQueue() + + archiver.Close() + + cmpDir := testhelper.TempDir(t) + require.NoError(t, os.Mkdir(filepath.Join(cmpDir, storageName), mode.Directory)) + + lsnPrefix := storage.LSN(1).String() + + // Assert manager 1 + tarPath := filepath.Join(partitionPath(archivePath, PartitionInfo{storageName, storage.PartitionID(1)}), storage.LSN(1).String()+".tar") + tar, err := os.Open(tarPath) + require.NoError(t, err) + testhelper.RequireTarState(t, tar, testhelper.DirectoryState{ + lsnPrefix: {Mode: archive.TarFileMode | archive.ExecuteMode | fs.ModeDir}, + filepath.Join(lsnPrefix, "1"): {Mode: archive.TarFileMode, Content: []byte("content-1")}, + filepath.Join(lsnPrefix, "2"): {Mode: archive.TarFileMode, Content: []byte("content-2")}, + filepath.Join(lsnPrefix, "3"): {Mode: archive.TarFileMode, Content: []byte("content-3")}, + }) + testhelper.MustClose(t, tar) + + // Assert manager 2 + tarPath = filepath.Join(partitionPath(archivePath, PartitionInfo{storageName, storage.PartitionID(2)}), lsnPrefix+".tar") + tar, err = os.Open(tarPath) + require.NoError(t, err) + testhelper.RequireTarState(t, tar, testhelper.DirectoryState{ + lsnPrefix: {Mode: archive.TarFileMode | archive.ExecuteMode | fs.ModeDir}, + }) + testhelper.MustClose(t, tar) + + lsnPrefix = storage.LSN(2).String() + tarPath = filepath.Join(partitionPath(archivePath, PartitionInfo{storageName, storage.PartitionID(2)}), lsnPrefix+".tar") + tar, err = os.Open(tarPath) + require.NoError(t, err) + + testhelper.RequireTarState(t, tar, testhelper.DirectoryState{ + lsnPrefix: {Mode: archive.TarFileMode | archive.ExecuteMode | fs.ModeDir}, + filepath.Join(lsnPrefix, "4"): {Mode: archive.TarFileMode, Content: []byte("content-4")}, + filepath.Join(lsnPrefix, "5"): {Mode: archive.TarFileMode, Content: []byte("content-5")}, + filepath.Join(lsnPrefix, "6"): {Mode: archive.TarFileMode, Content: []byte("content-6")}, + }) + testhelper.MustClose(t, tar) + + // Finally, assert the metrics. + testhelper.RequirePromMetrics(t, archiver.backupCounter, buildMetrics(t, 3, 0)) +} + +func appendLogEntry(t *testing.T, ctx context.Context, manager *log.Manager, files map[string][]byte) storage.LSN { + t.Helper() + + logEntryPath := testhelper.TempDir(t) + for name, value := range files { + path := filepath.Join(logEntryPath, name) + require.NoError(t, os.WriteFile(path, value, mode.File)) + } + + nextLSN, err := manager.AppendLogEntry(ctx, logEntryPath) + require.NoError(t, err) + + return nextLSN +} -- GitLab