diff --git a/internal/backup/log_entry.go b/internal/backup/log_entry.go index d347a644888a8dbdf3e81d8b399a697a22b5625a..6411501124e1a3ff3a99d4fd880efae8463ddce2 100644 --- a/internal/backup/log_entry.go +++ b/internal/backup/log_entry.go @@ -13,8 +13,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/archive" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" + logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) const ( @@ -88,7 +89,7 @@ type PartitionInfo struct { // an exponential backoff. type LogEntryArchiver struct { // logger is the logger to use to write log messages. - logger log.Logger + logger logging.Logger // store is where the log archives are kept. store LogEntryStore // node is used to access the LogManagers. @@ -130,12 +131,12 @@ type LogEntryArchiver struct { } // NewLogEntryArchiver constructs a new LogEntryArchiver. -func NewLogEntryArchiver(logger log.Logger, archiveSink *Sink, workerCount uint, node *storage.Node) *LogEntryArchiver { +func NewLogEntryArchiver(logger logging.Logger, archiveSink *Sink, workerCount uint, node *storage.Node) *LogEntryArchiver { return newLogEntryArchiver(logger, archiveSink, workerCount, node, helper.NewTimerTicker) } // newLogEntryArchiver constructs a new LogEntryArchiver with a configurable ticker function. -func newLogEntryArchiver(logger log.Logger, archiveSink *Sink, workerCount uint, node *storage.Node, tickerFunc func(time.Duration) helper.Ticker) *LogEntryArchiver { +func newLogEntryArchiver(logger logging.Logger, archiveSink *Sink, workerCount uint, node *storage.Node, tickerFunc func(time.Duration) helper.Ticker) *LogEntryArchiver { if workerCount < 1 { workerCount = 1 } @@ -294,8 +295,8 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { // We have already backed up all entries sent by the LogManager, but the manager is // not aware of this. Acknowledge again with our last processed entry. if state.nextLSN > notification.highWaterMark { - if err := la.callLogReader(ctx, notification.partitionInfo, func(lm storage.LogReader) { - lm.AcknowledgeConsumerPosition(state.nextLSN - 1) + if err := la.callLogReader(ctx, notification.partitionInfo, func(lm storage.LogReader) error { + return lm.AcknowledgePosition(log.ConsumerPosition, state.nextLSN-1) }); err != nil { la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for already completed entry") } @@ -306,7 +307,7 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { // we will be unable to backup the full sequence. if state.nextLSN < notification.lowWaterMark { la.logger.WithFields( - log.Fields{ + logging.Fields{ "storage": notification.partitionInfo.StorageName, "partition_id": notification.partitionInfo.PartitionID, "expected_lsn": state.nextLSN, @@ -328,7 +329,7 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { } } -func (la *LogEntryArchiver) callLogReader(ctx context.Context, partitionInfo PartitionInfo, callback func(lm storage.LogReader)) error { +func (la *LogEntryArchiver) callLogReader(ctx context.Context, partitionInfo PartitionInfo, callback func(lm storage.LogReader) error) error { storageHandle, err := (*la.node).GetStorage(partitionInfo.StorageName) if err != nil { return fmt.Errorf("get storage: %w", err) @@ -340,7 +341,9 @@ func (la *LogEntryArchiver) callLogReader(ctx context.Context, partitionInfo Par } defer partition.Close() - callback(partition.GetLogReader()) + if err := callback(partition.GetLogReader()); err != nil { + return fmt.Errorf("acknowledge consumer position: %w", err) + } return nil } @@ -376,11 +379,11 @@ func (la *LogEntryArchiver) receiveEntry(ctx context.Context, entry *logEntry) { la.waitDur = minRetryWait } - if err := la.callLogReader(ctx, entry.partitionInfo, func(lm storage.LogReader) { - lm.AcknowledgeConsumerPosition(entry.lsn) + if err := la.callLogReader(ctx, entry.partitionInfo, func(lm storage.LogReader) error { + return lm.AcknowledgePosition(log.ConsumerPosition, entry.lsn) }); err != nil { la.logger.WithError(err).WithFields( - log.Fields{ + logging.Fields{ "storage": entry.partitionInfo.StorageName, "partition_id": entry.partitionInfo.PartitionID, "lsn": entry.lsn, @@ -398,15 +401,16 @@ func (la *LogEntryArchiver) processEntries(ctx context.Context, inCh, outCh chan // processEntry checks if an existing backup exists, and performs a backup if not present. func (la *LogEntryArchiver) processEntry(ctx context.Context, entry *logEntry) { - logger := la.logger.WithFields(log.Fields{ + logger := la.logger.WithFields(logging.Fields{ "storage": entry.partitionInfo.StorageName, "partition_id": entry.partitionInfo.PartitionID, "lsn": entry.lsn, }) var entryPath string - if err := la.callLogReader(context.Background(), entry.partitionInfo, func(lm storage.LogReader) { + if err := la.callLogReader(context.Background(), entry.partitionInfo, func(lm storage.LogReader) error { entryPath = lm.GetEntryPath(entry.lsn) + return nil }); err != nil { la.backupCounter.WithLabelValues("fail").Add(1) la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for entry path") diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index 70b2113e1b089ff731703b23733b0a85bc262dae..bef0b03ecec205efb2d1755d3a5ed0c36c7d418e 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -76,7 +76,7 @@ type mockLogManager struct { storage.LogReader } -func (lm *mockLogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { +func (lm *mockLogManager) AcknowledgePosition(_ storage.PositionType, lsn storage.LSN) error { lm.Lock() defer lm.Unlock() @@ -109,6 +109,7 @@ func (lm *mockLogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { lm.finishCount-- lm.finishFunc() } + return nil } func (lm *mockLogManager) SendNotification() { @@ -607,7 +608,10 @@ func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { PartitionID: storage.PartitionID(i), } - logManager := log.NewManager(storageName, storage.PartitionID(i), testhelper.TempDir(t), testhelper.TempDir(t), archiver) + tracker := log.NewPositionTracker() + require.NoError(t, tracker.Register(log.ConsumerPosition)) + + logManager := log.NewManager(storageName, storage.PartitionID(i), testhelper.TempDir(t), testhelper.TempDir(t), archiver, tracker) require.NoError(t, logManager.Initialize(ctx, 0)) accessor.Lock() diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 4afbccf00a47476accb7c84fe9360175e40cf2e7..6581317840e9644610bda42467f9b6f113bb5a1c 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -169,15 +169,9 @@ type LogReader interface { // GetEntryPath returns the path of the log entry's root directory. GetEntryPath(lsn LSN) string - // The following functions allows other components acknowledge their positions. The log manager uses those - // positions to prune entries. Those interfaces are not great. We have a plan to refator them in: - // https://gitlab.com/gitlab-org/gitaly/-/issues/6528 - - // AcknowledgeConsumerPosition acknowledges log entries up and including lsn as successfully processed - // for the specified LogConsumer. - AcknowledgeConsumerPosition(lsn LSN) - // AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. - AcknowledgeAppliedPosition(lsn LSN) + // AcknowledgePosition acknowledges log entries up and including lsn as successfully processed + // for the specified position type. + AcknowledgePosition(PositionType, LSN) error } // LogManager is the interface used to manage the underlying Write-Ahead Log entries. @@ -203,6 +197,13 @@ type LogManager interface { GetNotificationQueue() <-chan error } +// PositionType implements storage.LogPositionType. It's a specific type of position to be tracked in the +// Write-Ahead Log (WAL) tracking system. It defines whether changes to this position type should trigger notifications. +type PositionType struct { + Name string + ShouldNotify bool +} + // Partition is responsible for a single partition of data. type Partition interface { // Begin begins a transaction against the partition. diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index 94aa40513d2d86f85f5b7e5cfbffcfda6667352f..82a92808977862e08d07372444c31970d2665176 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -43,7 +43,14 @@ func (f Factory) New( panic(fmt.Errorf("building a partition for a non-existent storage: %q", storageName)) } - logManager := log.NewManager(storageName, partitionID, stagingDir, absoluteStateDir, f.logConsumer) + positionTracker := log.NewPositionTracker() + if f.logConsumer != nil { + if err := positionTracker.Register(log.ConsumerPosition); err != nil { + panic(err) + } + } + + logManager := log.NewManager(storageName, partitionID, stagingDir, absoluteStateDir, f.logConsumer, positionTracker) return NewTransactionManager( partitionID, logger, diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index f44046aad14b3ac7678076c2b799dc1591fe380a..d66726360dd451dc61eb216e97e82ee62a9ba72b 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -9,7 +9,6 @@ import ( "path/filepath" "runtime/trace" "sync" - "sync/atomic" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" @@ -26,34 +25,6 @@ func EntryPath(stateDir string, lsn storage.LSN) string { return filepath.Join(StatePath(stateDir), lsn.String()) } -type positionType int - -const ( - // appliedPosition keeps track of latest applied position. WAL could not prune a log entry if it has not been applied. - appliedPosition positionType = iota + 1 - // consumerPosition keeps track of the latest consumer acknowledgement. - consumerPosition -) - -// position tracks the last LSN acknowledged for of a particular type. -type position struct { - lsn atomic.Value -} - -func newPosition() *position { - p := position{} - p.setPosition(0) - return &p -} - -func (p *position) getPosition() storage.LSN { - return p.lsn.Load().(storage.LSN) -} - -func (p *position) setPosition(pos storage.LSN) { - p.lsn.Store(pos) -} - // Manager is responsible for managing the Write-Ahead Log (WAL) entries on disk. It maintains the in-memory state // and indexing system that reflect the functional state of the WAL. The Manager ensures safe and consistent // proposals, applications, and prunings of log entries, acting as the interface for transactional log operations. It @@ -100,31 +71,31 @@ type Manager struct { // consumer is an external caller that may perform read-only operations against applied log entries. Log entries // are retained until the consumer has acknowledged past their LSN. consumer storage.LogConsumer - // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL log - // entries are only pruned when they are not used anymore. - positions map[positionType]*position + // positionTracker tracks positionTracker of log entries being used externally. Those positionTracker are + // tracked so that WAL log entries are only pruned when they are not used anymore. + positionTracker *PositionTracker // notifyQueue is a queue notifying when there is a new change or there's something wrong with the log manager. notifyQueue chan error } // NewManager returns an instance of Manager. -func NewManager(storageName string, partitionID storage.PartitionID, stagingDirectory string, stateDirectory string, consumer storage.LogConsumer) *Manager { - positions := map[positionType]*position{ - appliedPosition: newPosition(), - } - if consumer != nil { - positions[consumerPosition] = newPosition() - } - +func NewManager( + storageName string, + partitionID storage.PartitionID, + stagingDirectory string, + stateDirectory string, + consumer storage.LogConsumer, + positionTracker *PositionTracker, +) *Manager { return &Manager{ - storageName: storageName, - partitionID: partitionID, - tmpDirectory: stagingDirectory, - stateDirectory: stateDirectory, - consumer: consumer, - positions: positions, - notifyQueue: make(chan error, 1), + storageName: storageName, + partitionID: partitionID, + tmpDirectory: stagingDirectory, + stateDirectory: stateDirectory, + consumer: consumer, + positionTracker: positionTracker, + notifyQueue: make(chan error, 1), } } @@ -174,37 +145,34 @@ func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) erro } } + mgr.positionTracker.Each(func(t string, _ storage.LSN) { + // Set acknowledged position to oldestLSN - 1. If set the position to 0, the consumer is unable to read + // pruned entry anyway. + _ = mgr.positionTracker.Set(t, mgr.oldestLSN-1) + }) + if mgr.consumer != nil && mgr.appendedLSN != 0 { - // Set acknowledged position to oldestLSN - 1 and notify the consumer from oldestLSN -> appendedLSN. - // If set the position to 0, the consumer is unable to read pruned entry anyway. - mgr.AcknowledgeConsumerPosition(mgr.oldestLSN - 1) mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) } - mgr.AcknowledgeAppliedPosition(appliedLSN) return nil } -// AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. -func (mgr *Manager) AcknowledgeAppliedPosition(lsn storage.LSN) { - mgr.positions[appliedPosition].setPosition(lsn) - mgr.pruneLogEntries() -} - -// AcknowledgeConsumerPosition acknowledges log entries up and including LSN as successfully processed for the specified -// LogConsumer. The manager is awakened if it is currently awaiting a new or completed transaction. -func (mgr *Manager) AcknowledgeConsumerPosition(lsn storage.LSN) { - if mgr.consumer == nil { - panic("log manager's consumer must be present prior to AcknowledgeConsumerPos call") +// AcknowledgePosition acknowledges the position of a position type. +func (mgr *Manager) AcknowledgePosition(t storage.PositionType, lsn storage.LSN) error { + if err := mgr.positionTracker.Set(t.Name, lsn); err != nil { + return fmt.Errorf("acknowledge position: %w", err) } - mgr.positions[consumerPosition].setPosition(lsn) - mgr.pruneLogEntries() // Alert the outsider. If it has a pending acknowledgement already no action is required. - select { - case mgr.notifyQueue <- nil: - default: + if t.ShouldNotify { + select { + case mgr.notifyQueue <- nil: + default: + } } + mgr.pruneLogEntries() + return nil } // GetNotificationQueue returns a notify channel so that caller can poll new changes. @@ -420,11 +388,11 @@ func (mgr *Manager) lowWaterMark() storage.LSN { // Position is the last acknowledged LSN, this is eligible for pruning. // lowWaterMark returns the lowest LSN that cannot be pruned, so add one. - for _, pos := range mgr.positions { - if pos.getPosition()+1 < minAcknowledged { - minAcknowledged = pos.getPosition() + 1 + mgr.positionTracker.Each(func(_ string, p storage.LSN) { + if p+1 < minAcknowledged { + minAcknowledged = p + 1 } - } + }) return minAcknowledged } diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 5df69f2deaaa07e343787c586daea94f75939172..2324a35936259bbc3cb9877bc354ef8e468407ff 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -29,8 +29,16 @@ func appendLogEntry(t *testing.T, manager *Manager, files map[string][]byte) sto return nextLSN } +func newTracker(t *testing.T, consumer storage.LogConsumer) *PositionTracker { + tracker := NewPositionTracker() + if consumer != nil { + require.NoError(t, tracker.Register(ConsumerPosition)) + } + return tracker +} + func setupLogManager(t *testing.T, ctx context.Context, consumer storage.LogConsumer) *Manager { - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer, newTracker(t, consumer)) require.NoError(t, logManager.Initialize(ctx, 0)) return logManager @@ -56,7 +64,7 @@ func TestLogManager_Initialize(t *testing.T) { ctx := testhelper.Context(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) waitUntilPruningFinish(t, logManager) @@ -78,13 +86,13 @@ func TestLogManager_Initialize(t *testing.T) { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) - logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) waitUntilPruningFinish(t, logManager) @@ -105,11 +113,12 @@ func TestLogManager_Initialize(t *testing.T) { t.Run("existing WAL entries with appliedLSN in-between", func(t *testing.T) { t.Parallel() + ctx := testhelper.Context(t) stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { @@ -117,9 +126,11 @@ func TestLogManager_Initialize(t *testing.T) { } require.NoError(t, logManager.Close()) - logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 2)) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 2)) + waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(3), logManager.oldestLSN) require.Equal(t, storage.LSN(3), logManager.appendedLSN) @@ -140,7 +151,7 @@ func TestLogManager_Initialize(t *testing.T) { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 3; i++ { @@ -148,8 +159,9 @@ func TestLogManager_Initialize(t *testing.T) { } require.NoError(t, logManager.Close()) - logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 3)) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 3)) waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(4), logManager.oldestLSN) @@ -169,7 +181,7 @@ func TestLogManager_Initialize(t *testing.T) { ctx := testhelper.Context(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) // Attempt to initialize again @@ -186,7 +198,7 @@ func TestLogManager_Initialize(t *testing.T) { cancel() // Cancel the context before initializing stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) err := logManager.Initialize(ctx, 0) require.Error(t, err) @@ -198,7 +210,7 @@ func TestLogManager_Initialize(t *testing.T) { ctx, cancel := context.WithCancel(testhelper.Context(t)) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) // Cancel the context after initialization @@ -247,7 +259,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { }) // Set this entry as applied - logManager.AcknowledgeAppliedPosition(1) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 1)) waitUntilPruningFinish(t, logManager) // After removal @@ -268,12 +280,15 @@ func TestLogManager_PruneLogEntries(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) } + // Set the applied LSN to 2 + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 2)) + // Manually set the consumer's position to the first entry, forcing low-water mark to retain it + require.NoError(t, logManager.AcknowledgePosition(ConsumerPosition, 1)) + // Before removal assertDirectoryState(t, logManager, testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, - "/wal/0000000000001": {Mode: mode.Directory}, - "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, "/wal/0000000000002": {Mode: mode.Directory}, "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, "/wal/0000000000003": {Mode: mode.Directory}, @@ -281,9 +296,9 @@ func TestLogManager_PruneLogEntries(t *testing.T) { }) // Set the applied LSN to 2 - logManager.AcknowledgeAppliedPosition(2) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 2)) // Manually set the consumer's position to the first entry, forcing low-water mark to retain it - logManager.AcknowledgeConsumerPosition(1) + require.NoError(t, logManager.AcknowledgePosition(ConsumerPosition, 1)) waitUntilPruningFinish(t, logManager) require.Equal(t, storage.LSN(2), logManager.oldestLSN) @@ -326,7 +341,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { }) // Set the applied LSN to 3, allowing the first three entries to be pruned - logManager.AcknowledgeAppliedPosition(3) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 3)) waitUntilPruningFinish(t, logManager) // Ensure only entries starting from LSN 4 are retained @@ -349,7 +364,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) for i := 0; i < 5; i++ { @@ -367,7 +382,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { require.NoError(t, os.Chmod(infectedPath, 0o444)) // The error is notified via notification queue so that the caller can act accordingly - logManager.AcknowledgeAppliedPosition(5) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 5)) require.ErrorContains(t, <-logManager.GetNotificationQueue(), "permission denied") require.NoError(t, logManager.Close()) @@ -386,8 +401,9 @@ func TestLogManager_PruneLogEntries(t *testing.T) { }) // Restart the manager - logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager = NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 5)) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 5)) waitUntilPruningFinish(t, logManager) testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ @@ -402,7 +418,7 @@ func TestLogManager_PruneLogEntries(t *testing.T) { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) - logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil) + logManager := NewManager("test-storage", 1, stagingDir, stateDir, nil, newTracker(t, nil)) require.NoError(t, logManager.Initialize(ctx, 0)) var wg sync.WaitGroup @@ -427,12 +443,12 @@ func TestLogManager_PruneLogEntries(t *testing.T) { if logManager.AppendedLSN() == totalLSN { return } - logManager.AcknowledgeAppliedPosition(logManager.AppendedLSN()) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, logManager.AppendedLSN())) } }() } wg.Wait() - logManager.AcknowledgeAppliedPosition(logManager.AppendedLSN()) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, logManager.AppendedLSN())) require.NoError(t, logManager.Close()) assertDirectoryState(t, logManager, testhelper.DirectoryState{ @@ -534,8 +550,8 @@ func TestLogManager_Positions(t *testing.T) { ctx := testhelper.Context(t) simulatePositions := func(t *testing.T, logManager *Manager, consumed storage.LSN, applied storage.LSN) { - logManager.AcknowledgeConsumerPosition(consumed) - logManager.AcknowledgeAppliedPosition(applied) + require.NoError(t, logManager.AcknowledgePosition(ConsumerPosition, consumed)) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, applied)) } t.Run("consumer pos is set to 0 after initialized", func(t *testing.T) { @@ -557,7 +573,7 @@ func TestLogManager_Positions(t *testing.T) { // Before restart mockConsumer := &mockLogConsumer{} - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer, newTracker(t, mockConsumer)) require.NoError(t, logManager.Initialize(ctx, 0)) appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) @@ -585,7 +601,7 @@ func TestLogManager_Positions(t *testing.T) { // Restart the log consumer. mockConsumer = &mockLogConsumer{} - logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer, newTracker(t, mockConsumer)) require.NoError(t, logManager.Initialize(ctx, 2)) // Notify consumer to consume from 2 -> 4 @@ -741,6 +757,63 @@ func TestLogManager_Positions(t *testing.T) { "/wal": {Mode: mode.Directory}, }) }) + + t.Run("more position types apart from defaults are supported", func(t *testing.T) { + consumer := &mockLogConsumer{} + + tracker := newTracker(t, consumer) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer, tracker) + + t1 := storage.PositionType{Name: "TestPosition1", ShouldNotify: false} + t2 := storage.PositionType{Name: "TestPosition2", ShouldNotify: false} + require.NoError(t, tracker.Register(t1)) + require.NoError(t, tracker.Register(t2)) + + require.NoError(t, logManager.Initialize(ctx, 0)) + + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) + + // Consumed = 3, Applied = 2, TestPosition1 = 1, testPosition2 = 1 + simulatePositions(t, logManager, 3, 2) + + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + + // Consumed = 3, Applied = 3, TestPosition1 = 2, testPosition2 = 2 + require.NoError(t, logManager.AcknowledgePosition(t1, 2)) + require.NoError(t, logManager.AcknowledgePosition(t2, 2)) + simulatePositions(t, logManager, 3, 3) + + require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + + // All positions are 3 + require.NoError(t, logManager.AcknowledgePosition(t1, 3)) + require.NoError(t, logManager.AcknowledgePosition(t2, 3)) + simulatePositions(t, logManager, 3, 3) + + require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) } func TestLogManager_Close(t *testing.T) { @@ -748,7 +821,7 @@ func TestLogManager_Close(t *testing.T) { t.Run("close uninitialized manager", func(t *testing.T) { t.Parallel() - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil, newTracker(t, nil)) // Attempt to close the manager before initialization err := logManager.Close() @@ -759,7 +832,7 @@ func TestLogManager_Close(t *testing.T) { t.Run("close after initialization", func(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) - logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil) + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), nil, newTracker(t, nil)) // Properly initialize the manager require.NoError(t, logManager.Initialize(ctx, 0)) @@ -802,7 +875,7 @@ func TestLogManager_Close(t *testing.T) { appendLogEntry(t, logManager, map[string][]byte{"2": []byte("content-2")}) // Trigger pruning - logManager.AcknowledgeAppliedPosition(2) + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 2)) // Close the manager and ensure all tasks are completed require.NoError(t, logManager.Close()) diff --git a/internal/gitaly/storage/storagemgr/partition/log/positions.go b/internal/gitaly/storage/storagemgr/partition/log/positions.go new file mode 100644 index 0000000000000000000000000000000000000000..842f80188d0aa3154cfb121c97e6f9531208f095 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log/positions.go @@ -0,0 +1,82 @@ +// positions.go +package log + +import ( + "fmt" + "sync/atomic" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" +) + +var ( + // AppliedPosition keeps track of the latest applied position. WAL cannot prune a log entry if it has not been applied. + AppliedPosition = storage.PositionType{Name: "AppliedPosition", ShouldNotify: false} + // ConsumerPosition keeps track of the latest consumer acknowledgment. + ConsumerPosition = storage.PositionType{Name: "ConsumerPosition", ShouldNotify: true} +) + +// position tracks the last LSN acknowledged for a particular type. +type position struct { + atomic.Value +} + +func newPosition() *position { + p := position{} + p.setPosition(0) + return &p +} + +func (p *position) getPosition() storage.LSN { + return p.Load().(storage.LSN) +} + +func (p *position) setPosition(pos storage.LSN) { + p.Store(pos) +} + +// PositionTracker manages positions for various position types. +type PositionTracker struct { + positions map[string]*position +} + +// NewPositionTracker creates and initializes a new PositionTracker. +func NewPositionTracker() *PositionTracker { + return &PositionTracker{ + positions: map[string]*position{ + AppliedPosition.Name: newPosition(), + }, + } +} + +// Register adds a new position type to the tracker. +func (p *PositionTracker) Register(t storage.PositionType) error { + if _, exist := p.positions[t.Name]; exist { + return fmt.Errorf("position type %q already registered", t.Name) + } + p.positions[t.Name] = newPosition() + return nil +} + +// Set updates the position for a given type. +func (p *PositionTracker) Set(t string, lsn storage.LSN) error { + if _, exist := p.positions[t]; !exist { + return fmt.Errorf("acknowledged an unregistered position type %q", t) + } + p.positions[t].setPosition(lsn) + return nil +} + +// Get retrieves the position for a given type. +func (p *PositionTracker) Get(t string) (storage.LSN, error) { + if _, exist := p.positions[t]; !exist { + return 0, fmt.Errorf("acknowledged an unregistered position type %q", t) + } + return p.positions[t].getPosition(), nil +} + +// Each iterates through the list of tracked positions and yields the callback with corresponding LSN. +func (p *PositionTracker) Each(callback func(string, storage.LSN)) { + for t, pos := range p.positions { + callback(t, pos.getPosition()) + } +} diff --git a/internal/gitaly/storage/storagemgr/partition/log/positions_test.go b/internal/gitaly/storage/storagemgr/partition/log/positions_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e26457779afcd22fc67397a4dfee138bcd842ccc --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log/positions_test.go @@ -0,0 +1,133 @@ +package log + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" +) + +func TestPositionTracker(t *testing.T) { + t.Parallel() + + t.Run("Set and Get AppliedPosition and ConsumerPosition", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + require.NoError(t, tracker.Set(AppliedPosition.Name, storage.LSN(5))) + + pos, err := tracker.Get(AppliedPosition.Name) + require.NoError(t, err) + require.Equal(t, storage.LSN(5), pos) + + require.NoError(t, tracker.Register(ConsumerPosition)) + require.NoError(t, tracker.Set(ConsumerPosition.Name, storage.LSN(10))) + + pos, err = tracker.Get(ConsumerPosition.Name) + require.NoError(t, err) + require.Equal(t, storage.LSN(10), pos) + }) + + t.Run("Set and Get single position multiple times", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + testPosition := storage.PositionType{Name: "TestPosition", ShouldNotify: false} + require.NoError(t, tracker.Register(testPosition)) + + for i := 1; i <= 3; i++ { + require.NoError(t, tracker.Set(testPosition.Name, storage.LSN(i))) + pos, err := tracker.Get(testPosition.Name) + require.NoError(t, err) + require.Equal(t, storage.LSN(i), pos) + } + }) + + t.Run("Set and Get multiple positions", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + positions := []storage.PositionType{ + {Name: "Position1", ShouldNotify: false}, + {Name: "Position2", ShouldNotify: true}, + } + values := []storage.LSN{5, 10} + + for i, posType := range positions { + require.NoError(t, tracker.Register(posType)) + require.NoError(t, tracker.Set(posType.Name, values[i])) + } + + for i, posType := range positions { + pos, err := tracker.Get(posType.Name) + require.NoError(t, err) + require.Equal(t, values[i], pos) + } + }) + + t.Run("Double register position type", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + testPosition := storage.PositionType{Name: "TestPosition", ShouldNotify: false} + require.NoError(t, tracker.Register(testPosition)) + err := tracker.Register(testPosition) + require.EqualError(t, err, "position type \"TestPosition\" already registered") + }) + + t.Run("Duplicated name register", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + testPosition1 := storage.PositionType{Name: "TestPosition", ShouldNotify: false} + testPosition2 := storage.PositionType{Name: "TestPosition", ShouldNotify: true} + + require.NoError(t, tracker.Register(testPosition1)) + err := tracker.Register(testPosition2) + require.EqualError(t, err, "position type \"TestPosition\" already registered") + }) + + t.Run("Ack unregistered position", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + posType := storage.PositionType{Name: "Unregistered", ShouldNotify: false} + + err := tracker.Set(posType.Name, storage.LSN(1)) + require.EqualError(t, err, "acknowledged an unregistered position type \"Unregistered\"") + }) + + t.Run("Get unregistered position", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + posType := storage.PositionType{Name: "Unregistered", ShouldNotify: false} + + _, err := tracker.Get(posType.Name) + require.EqualError(t, err, "acknowledged an unregistered position type \"Unregistered\"") + }) + + t.Run("Range over positions", func(t *testing.T) { + t.Parallel() + + tracker := NewPositionTracker() + positions := []storage.PositionType{ + {Name: "Position1", ShouldNotify: false}, + {Name: "Position2", ShouldNotify: true}, + } + values := []storage.LSN{5, 10} + + for i, posType := range positions { + require.NoError(t, tracker.Register(posType)) + require.NoError(t, tracker.Set(posType.Name, values[i])) + } + + trackedPositions := map[string]storage.LSN{} + tracker.Each(func(name string, lsn storage.LSN) { + trackedPositions[name] = lsn + }) + + require.Equal(t, storage.LSN(5), trackedPositions["Position1"]) + require.Equal(t, storage.LSN(10), trackedPositions["Position2"]) + }) +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go index c783bd0b3538ffdd84c64182bb8c644234ffd8bc..29944a1357a1fbc5b2ae98fd87deb2bbffeee889 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go @@ -20,7 +20,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" ) @@ -156,28 +155,12 @@ func TestMigrationManager_Begin(t *testing.T) { cache := catfile.NewCache(cfg) defer cache.Stop() - repositoryFactory, err := localrepo.NewFactory( - logger, config.NewLocator(cfg), cmdFactory, cache, - ).ScopeByStorage(ctx, cfg.Storages[0].Name) - require.NoError(t, err) + repositoryFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, cache) - m := partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)).Scope(storageName) - - logManager := log.NewManager(storageName, testPartitionID, stagingDir, stateDir, nil) - tm := partition.NewTransactionManager( - testPartitionID, - logger, - database, - storageName, - storagePath, - stateDir, - stagingDir, - cmdFactory, - repositoryFactory, - m, - logManager, - ) + m := partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)) + factory := partition.NewFactory(cmdFactory, repositoryFactory, m, nil) + tm := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir) mm := migrationManager{ Partition: tm, logger: logger, diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 28ae099b883478ea806da0c57a2e50caf533aa50..24671dad1fc9afb5f7b420cb00569adc1f73c9c9 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -1043,16 +1043,17 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas stagingDir := filepath.Join(storagePath, "staging") require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) - newMetrics := func() ManagerMetrics { - return NewMetrics(housekeeping.NewMetrics(setup.Config.Prometheus)).Scope(storageName) + newMetrics := func() Metrics { + return NewMetrics(housekeeping.NewMetrics(setup.Config.Prometheus)) } var ( // managerRunning tracks whether the manager is running or closed. managerRunning bool + // factory is the factory that produces the current TransactionManager + factory = NewFactory(setup.CommandFactory, setup.RepositoryFactory, newMetrics(), setup.Consumer) // transactionManager is the current TransactionManager instance. - logManager = log.NewManager(storageName, setup.PartitionID, stagingDir, stateDir, setup.Consumer) - transactionManager = NewTransactionManager(setup.PartitionID, logger, database, storageName, storagePath, stateDir, stagingDir, setup.CommandFactory, storageScopedFactory, newMetrics(), logManager) + transactionManager = factory.New(logger, setup.PartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) // managerErr is used for synchronizing manager closing and returning // the error from Run. managerErr chan error @@ -1101,8 +1102,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.NoError(t, os.RemoveAll(stagingDir)) require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) - logManager := log.NewManager(storageName, setup.PartitionID, stagingDir, stateDir, setup.Consumer) - transactionManager = NewTransactionManager(setup.PartitionID, logger, database, setup.Config.Storages[0].Name, storagePath, stateDir, stagingDir, setup.CommandFactory, storageScopedFactory, newMetrics(), logManager) + transactionManager = factory.New(logger, setup.PartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) installHooks(transactionManager, &inflightTransactions, step.Hooks) go func() { @@ -1420,7 +1420,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction := openTransactions[step.TransactionID] transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: - transactionManager.logManager.AcknowledgeConsumerPosition(step.LSN) + require.NoError(t, transactionManager.logManager.AcknowledgePosition(log.ConsumerPosition, step.LSN)) case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 05a41952df1b8c7d1d844491586e41b6e8aed4a8..4ad8485e11d759d33f29c6f987df5a03986e17e5 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -35,6 +35,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/fshistory" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal/reftree" @@ -2355,6 +2356,9 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { if err := mgr.logManager.Initialize(ctx, mgr.appliedLSN); err != nil { return fmt.Errorf("initialize log management: %w", err) } + if err := mgr.logManager.AcknowledgePosition(log.AppliedPosition, mgr.appliedLSN); err != nil { + return fmt.Errorf("acknowledge applied LSN: %w", err) + } if err := os.Mkdir(mgr.snapshotsDir(), mode.Directory); err != nil { return fmt.Errorf("create snapshot manager directory: %w", err) @@ -3074,7 +3078,9 @@ func (mgr *TransactionManager) storeAppliedLSN(lsn storage.LSN) error { if err := mgr.setKey(keyAppliedLSN, lsn.ToProto()); err != nil { return err } - mgr.logManager.AcknowledgeAppliedPosition(lsn) + if err := mgr.logManager.AcknowledgePosition(log.AppliedPosition, lsn); err != nil { + return fmt.Errorf("acknowledge applied LSN: %w", err) + } mgr.appliedLSN = lsn return nil } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go index 37246625020f12fd91453233f614b13067c28b12..e29c0d51cf4c9d286a71f36b84641560a3b68f32 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go @@ -379,10 +379,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti }, CloseManager{}, StartManager{}, - AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - // Wait until the first acknowledgement after restart - <-tm.logManager.GetNotificationQueue() - }), }, expectedState: StateAssertion{ Database: DatabaseState{ diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index 45daec4e61455a6ec2e48df53a315c9b61fbc56d..ccf028305a91171ed7d5644fba582c9bb19fb17a 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -2257,7 +2257,11 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t require.NoError(t, err) require.NoError(t, os.WriteFile(manifestPath(logEntryPath), manifestBytes, mode.File)) - logManager := log.NewManager(tm.storageName, setup.PartitionID, testhelper.TempDir(t), filepath.Join(tm.storagePath, "state"), setup.Consumer) + tracker := log.NewPositionTracker() + if setup.Consumer != nil { + require.NoError(t, tracker.Register(log.ConsumerPosition)) + } + logManager := log.NewManager(tm.storageName, setup.PartitionID, testhelper.TempDir(t), filepath.Join(tm.storagePath, "state"), setup.Consumer, tracker) require.NoError(t, logManager.Initialize(ctx, 3)) lsn, err := logManager.AppendLogEntry(logEntryPath) require.NoError(t, err) @@ -2381,10 +2385,7 @@ func BenchmarkTransactionManager(b *testing.B) { managers []*TransactionManager ) - repositoryFactory, err := localrepo.NewFactory( - logger, config.NewLocator(cfg), cmdFactory, cache, - ).ScopeByStorage(ctx, cfg.Storages[0].Name) - require.NoError(b, err) + repositoryFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, cache) // transactionWG tracks the number of on going transaction. var transactionWG sync.WaitGroup @@ -2405,13 +2406,14 @@ func BenchmarkTransactionManager(b *testing.B) { stagingDir := filepath.Join(storagePath, "staging", strconv.Itoa(i)) require.NoError(b, os.MkdirAll(stagingDir, mode.Directory)) - m := NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)).Scope(storageName) + m := NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)) // Valid partition IDs are >=1. testPartitionID := storage.PartitionID(i + 1) - logManager := log.NewManager(storageName, testPartitionID, stagingDir, stateDir, nil) - manager := NewTransactionManager(testPartitionID, logger, database, storageName, storagePath, stateDir, stagingDir, cmdFactory, repositoryFactory, m, logManager) + factory := NewFactory(cmdFactory, repositoryFactory, m, nil) + // transactionManager is the current TransactionManager instance. + manager := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) managers = append(managers, manager) @@ -2421,7 +2423,10 @@ func BenchmarkTransactionManager(b *testing.B) { assert.NoError(b, manager.Run()) }() - objectHash, err := repositoryFactory.Build(repo.GetRelativePath()).ObjectHash(ctx) + scopedRepositoryFactory, err := repositoryFactory.ScopeByStorage(ctx, cfg.Storages[0].Name) + require.NoError(b, err) + + objectHash, err := scopedRepositoryFactory.Build(repo.GetRelativePath()).ObjectHash(ctx) require.NoError(b, err) for updaterID := 0; updaterID < tc.concurrentUpdaters; updaterID++ {