diff --git a/internal/backup/log_entry.go b/internal/backup/log_entry.go index e569b09d63cf9dd6fad3f7bb9572b7f4c45b22f1..91b82c7ed2629951c567726104247d0dd6406947 100644 --- a/internal/backup/log_entry.go +++ b/internal/backup/log_entry.go @@ -39,7 +39,7 @@ func newLogEntry(partitionInfo PartitionInfo, lsn storage.LSN) *logEntry { } } -// partitionNotification is used to store the data received by NotifyNewTransactions. +// partitionNotification is used to store the data received by NotifyNewEntries. type partitionNotification struct { lowWaterMark storage.LSN highWaterMark storage.LSN @@ -82,16 +82,6 @@ type PartitionInfo struct { PartitionID storage.PartitionID } -// LogManager is the interface used on the consumer side of the integration. The consumer -// has the ability to acknowledge transactions as having been processed with AcknowledgeTransaction. -type LogManager interface { - // AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed - // for the specified LogConsumer. - AcknowledgeTransaction(lsn storage.LSN) - // GetTransactionPath returns the path of the log entry's root directory. - GetTransactionPath(lsn storage.LSN) string -} - // LogEntryArchiver is used to backup applied log entries. It has a configurable number of // worker goroutines that will perform backups. Each partition may only have one backup // executing at a time, entries are always processed in-order. Backup failures will trigger @@ -182,8 +172,8 @@ func newLogEntryArchiver(logger log.Logger, archiveSink *Sink, workerCount uint, return archiver } -// NotifyNewTransactions passes the transaction information to the LogEntryArchiver for processing. -func (la *LogEntryArchiver) NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) { +// NotifyNewEntries passes the log entry information to the LogEntryArchiver for processing. +func (la *LogEntryArchiver) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) { la.notificationsMutex.Lock() defer la.notificationsMutex.Unlock() @@ -304,8 +294,8 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { // We have already backed up all entries sent by the LogManager, but the manager is // not aware of this. Acknowledge again with our last processed entry. if state.nextLSN > notification.highWaterMark { - if err := la.callLogManager(ctx, notification.partitionInfo, func(lm LogManager) { - lm.AcknowledgeTransaction(state.nextLSN - 1) + if err := la.callLogManager(ctx, notification.partitionInfo, func(lm storage.LogManager) { + lm.AcknowledgeConsumerPosition(state.nextLSN - 1) }); err != nil { la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for already completed entry") } @@ -338,7 +328,7 @@ func (la *LogEntryArchiver) ingestNotifications(ctx context.Context) { } } -func (la *LogEntryArchiver) callLogManager(ctx context.Context, partitionInfo PartitionInfo, callback func(lm LogManager)) error { +func (la *LogEntryArchiver) callLogManager(ctx context.Context, partitionInfo PartitionInfo, callback func(lm storage.LogManager)) error { storageHandle, err := (*la.node).GetStorage(partitionInfo.StorageName) if err != nil { return fmt.Errorf("get storage: %w", err) @@ -350,12 +340,7 @@ func (la *LogEntryArchiver) callLogManager(ctx context.Context, partitionInfo Pa } defer partition.Close() - logManager, ok := partition.(LogManager) - if !ok { - return fmt.Errorf("expected LogManager, got %T", logManager) - } - - callback(logManager) + callback(partition.GetLogManager()) return nil } @@ -391,8 +376,8 @@ func (la *LogEntryArchiver) receiveEntry(ctx context.Context, entry *logEntry) { la.waitDur = minRetryWait } - if err := la.callLogManager(ctx, entry.partitionInfo, func(lm LogManager) { - lm.AcknowledgeTransaction(entry.lsn) + if err := la.callLogManager(ctx, entry.partitionInfo, func(lm storage.LogManager) { + lm.AcknowledgeConsumerPosition(entry.lsn) }); err != nil { la.logger.WithError(err).WithFields( log.Fields{ @@ -420,8 +405,8 @@ func (la *LogEntryArchiver) processEntry(ctx context.Context, entry *logEntry) { }) var entryPath string - if err := la.callLogManager(context.Background(), entry.partitionInfo, func(lm LogManager) { - entryPath = lm.GetTransactionPath(entry.lsn) + if err := la.callLogManager(context.Background(), entry.partitionInfo, func(lm storage.LogManager) { + entryPath = lm.GetEntryPath(entry.lsn) }); err != nil { la.backupCounter.WithLabelValues("fail").Add(1) la.logger.WithError(err).Error("log entry archiver: failed to get LogManager for entry path") @@ -451,7 +436,7 @@ func (la *LogEntryArchiver) processEntry(ctx context.Context, entry *logEntry) { entry.success = true } -// backupLogEntry tar's the root directory of the transaction and writes it to the Sink. +// backupLogEntry tar's the root directory of the log entry and writes it to the Sink. func (la *LogEntryArchiver) backupLogEntry(ctx context.Context, partitionInfo PartitionInfo, lsn storage.LSN, entryPath string) (returnErr error) { timer := prometheus.NewTimer(la.backupLatency) defer timer.ObserveDuration() diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index b32534d9f8571bfc6d3d3a13c271a2ec46621c99..91eaf2059ed40588174c922801e7ef4318bbeb6a 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -18,13 +18,25 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/archive" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) +type mockPartition struct { + storage.Partition + manager storage.LogManager +} + +func (p *mockPartition) Close() {} + +func (p *mockPartition) GetLogManager() storage.LogManager { + return p.manager +} + type mockNode struct { - managers map[PartitionInfo]*mockLogManager - t *testing.T + partitions map[PartitionInfo]*mockPartition + t *testing.T sync.Mutex } @@ -44,7 +56,7 @@ func (m mockStorage) GetPartition(ctx context.Context, partitionID storage.Parti defer m.node.Unlock() info := PartitionInfo{m.storageName, partitionID} - mgr, ok := m.node.managers[info] + mgr, ok := m.node.partitions[info] assert.True(m.node.t, ok) return mgr, nil @@ -61,12 +73,11 @@ type mockLogManager struct { finishCount int sync.Mutex - storage.Partition } func (lm *mockLogManager) Close() {} -func (lm *mockLogManager) AcknowledgeTransaction(lsn storage.LSN) { +func (lm *mockLogManager) AcknowledgeConsumerPosition(lsn storage.LSN) { lm.Lock() defer lm.Unlock() @@ -103,12 +114,12 @@ func (lm *mockLogManager) AcknowledgeTransaction(lsn storage.LSN) { func (lm *mockLogManager) SendNotification() { n := lm.notifications[0] - lm.archiver.NotifyNewTransactions(lm.partitionInfo.StorageName, lm.partitionInfo.PartitionID, n.lowWaterMark, n.highWaterMark) + lm.archiver.NotifyNewEntries(lm.partitionInfo.StorageName, lm.partitionInfo.PartitionID, n.lowWaterMark, n.highWaterMark) lm.notifications = lm.notifications[1:] } -func (lm *mockLogManager) GetTransactionPath(lsn storage.LSN) string { +func (lm *mockLogManager) GetEntryPath(lsn storage.LSN) string { return filepath.Join(partitionPath(lm.entryRootPath, lm.partitionInfo), lsn.String()) } @@ -295,8 +306,8 @@ func TestLogEntryArchiver(t *testing.T) { var wg sync.WaitGroup accessor := &mockNode{ - managers: make(map[PartitionInfo]*mockLogManager, len(tc.partitions)), - t: t, + partitions: make(map[PartitionInfo]*mockPartition, len(tc.partitions)), + t: t, } node := storage.Node(accessor) @@ -330,7 +341,7 @@ func TestLogEntryArchiver(t *testing.T) { managers[info] = manager accessor.Lock() - accessor.managers[info] = manager + accessor.partitions[info] = &mockPartition{manager: manager} accessor.Unlock() // Send partitions in parallel to mimic real usage. @@ -358,7 +369,8 @@ func TestLogEntryArchiver(t *testing.T) { cmpDir := testhelper.TempDir(t) require.NoError(t, os.Mkdir(filepath.Join(cmpDir, storageName), mode.Directory)) - for info, manager := range accessor.managers { + for info, partition := range accessor.partitions { + manager := partition.GetLogManager().(*mockLogManager) lastAck := manager.acknowledged[len(manager.acknowledged)-1] require.Equal(t, tc.finalLSN, lastAck) @@ -417,8 +429,8 @@ func TestLogEntryArchiver_retry(t *testing.T) { require.NoError(t, err) accessor := &mockNode{ - managers: make(map[PartitionInfo]*mockLogManager, 1), - t: t, + partitions: make(map[PartitionInfo]*mockPartition, 1), + t: t, } node := storage.Node(accessor) @@ -447,7 +459,7 @@ func TestLogEntryArchiver_retry(t *testing.T) { } accessor.Lock() - accessor.managers[info] = manager + accessor.partitions[info] = &mockPartition{manager: manager} accessor.Unlock() wg.Add(1) @@ -562,3 +574,124 @@ func buildMetrics(t *testing.T, successCt, failCt int) string { return builder.String() } + +// TestLogEntryArchiver_WithRealLogManager runs the log archiver with a real WAL log manager. This test aims to assert +// the flow between the two components to avoid uncaught errors due to mocking. Test setup and verification are +// extremely verbose. There's no reliable way to simulate more sophisticated scenarios because the test could not +// intercept internal states of log archiver or log manager. The log manager doesn't expose the current consumer +// position. Thus, this test verifies the most basic archiving. Unit tests using mocking will cover the rest. +func TestLogEntryArchiver_WithRealLogManager(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + // Setup node + accessor := &mockNode{ + partitions: make(map[PartitionInfo]*mockPartition), + t: t, + } + node := storage.Node(accessor) + + // Setup archiver + archivePath := testhelper.TempDir(t) + archiveSink, err := ResolveSink(ctx, archivePath) + require.NoError(t, err) + + archiver := NewLogEntryArchiver(testhelper.NewLogger(t), archiveSink, 1, &node) + archiver.Run() + + // Setup WAL log manager and plug it to storage + const storageName = "default" + var logManagers []*log.Manager + for i := 1; i <= 2; i++ { + info := PartitionInfo{ + StorageName: storageName, + PartitionID: storage.PartitionID(i), + } + + logManager := log.NewManager(storageName, storage.PartitionID(i), testhelper.TempDir(t), testhelper.TempDir(t), archiver) + require.NoError(t, logManager.Initialize(ctx, 0)) + + accessor.Lock() + accessor.partitions[info] = &mockPartition{manager: logManager} + accessor.Unlock() + + logManagers = append(logManagers, logManager) + } + + appendLogEntry(t, ctx, logManagers[0], map[string][]byte{ + "1": []byte("content-1"), + "2": []byte("content-2"), + "3": []byte("content-3"), + }) + <-logManagers[0].NotifyQueue() + + // KV operation without any file. + appendLogEntry(t, ctx, logManagers[1], map[string][]byte{}) + <-logManagers[1].NotifyQueue() + + appendLogEntry(t, ctx, logManagers[1], map[string][]byte{ + "4": []byte("content-4"), + "5": []byte("content-5"), + "6": []byte("content-6"), + }) + <-logManagers[1].NotifyQueue() + + archiver.Close() + + cmpDir := testhelper.TempDir(t) + require.NoError(t, os.Mkdir(filepath.Join(cmpDir, storageName), mode.Directory)) + + lsnPrefix := storage.LSN(1).String() + + // Assert manager 1 + tarPath := filepath.Join(partitionPath(archivePath, PartitionInfo{storageName, storage.PartitionID(1)}), storage.LSN(1).String()+".tar") + tar, err := os.Open(tarPath) + require.NoError(t, err) + testhelper.RequireTarState(t, tar, testhelper.DirectoryState{ + lsnPrefix: {Mode: archive.TarFileMode | archive.ExecuteMode | fs.ModeDir}, + filepath.Join(lsnPrefix, "1"): {Mode: archive.TarFileMode, Content: []byte("content-1")}, + filepath.Join(lsnPrefix, "2"): {Mode: archive.TarFileMode, Content: []byte("content-2")}, + filepath.Join(lsnPrefix, "3"): {Mode: archive.TarFileMode, Content: []byte("content-3")}, + }) + testhelper.MustClose(t, tar) + + // Assert manager 2 + tarPath = filepath.Join(partitionPath(archivePath, PartitionInfo{storageName, storage.PartitionID(2)}), lsnPrefix+".tar") + tar, err = os.Open(tarPath) + require.NoError(t, err) + testhelper.RequireTarState(t, tar, testhelper.DirectoryState{ + lsnPrefix: {Mode: archive.TarFileMode | archive.ExecuteMode | fs.ModeDir}, + }) + testhelper.MustClose(t, tar) + + lsnPrefix = storage.LSN(2).String() + tarPath = filepath.Join(partitionPath(archivePath, PartitionInfo{storageName, storage.PartitionID(2)}), lsnPrefix+".tar") + tar, err = os.Open(tarPath) + require.NoError(t, err) + + testhelper.RequireTarState(t, tar, testhelper.DirectoryState{ + lsnPrefix: {Mode: archive.TarFileMode | archive.ExecuteMode | fs.ModeDir}, + filepath.Join(lsnPrefix, "4"): {Mode: archive.TarFileMode, Content: []byte("content-4")}, + filepath.Join(lsnPrefix, "5"): {Mode: archive.TarFileMode, Content: []byte("content-5")}, + filepath.Join(lsnPrefix, "6"): {Mode: archive.TarFileMode, Content: []byte("content-6")}, + }) + testhelper.MustClose(t, tar) + + // Finally, assert the metrics. + testhelper.RequirePromMetrics(t, archiver.backupCounter, buildMetrics(t, 3, 0)) +} + +func appendLogEntry(t *testing.T, ctx context.Context, manager *log.Manager, files map[string][]byte) storage.LSN { + t.Helper() + + logEntryPath := testhelper.TempDir(t) + for name, value := range files { + path := filepath.Join(logEntryPath, name) + require.NoError(t, os.WriteFile(path, value, mode.File)) + } + + nextLSN, err := manager.AppendLogEntry(ctx, logEntryPath) + require.NoError(t, err) + + return nextLSN +} diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index f4a27a343868238380028e6311f2831ddb6b9c38..5a17edf5e66c3dd5e501709bf4168f2ce5942836 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -390,7 +390,7 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { } defer dbMgr.Close() - var logConsumer partition.LogConsumer + var logConsumer storage.LogConsumer if cfg.Backup.WALGoCloudURL != "" { walSink, err := backup.ResolveSink(ctx, cfg.Backup.WALGoCloudURL) if err != nil { diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index d79ade690665ae0de494dce89641ecdd4ceac6a8..9e0432f097d740cac98eaa677ad0dc1d248670e2 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -139,12 +139,38 @@ type BeginOptions struct { ForceExclusiveSnapshot bool } +// LogConsumer is the interface of a log consumer that is passed to a TransactionManager. +// The LogConsumer may perform read-only operations against the on-disk log entry. +// The TransactionManager notifies the consumer of new transactions by invoking the +// NotifyNewTransaction method after they are committed. +type LogConsumer interface { + // NotifyNewEntries alerts the LogConsumer that new log entries are available for + // consumption. The method invoked both when the TransactionManager + // initializes and when new transactions are committed. Both the low and high water mark + // LSNs are sent so that a newly initialized consumer is aware of the full range of + // entries it can process. + NotifyNewEntries(storageName string, partitionID PartitionID, lowWaterMark, highWaterMark LSN) +} + +// LogManager is the interface used on the consumer side of the integration. The consumer +// has the ability to acknowledge transactions as having been processed with AcknowledgeConsumerPosition. +type LogManager interface { + // AcknowledgeConsumerPosition acknowledges log entries up and including lsn as successfully processed + // for the specified LogConsumer. + AcknowledgeConsumerPosition(lsn LSN) + // GetEntryPath returns the path of the log entry's root directory. + GetEntryPath(lsn LSN) string +} + // Partition is responsible for a single partition of data. type Partition interface { // Begin begins a transaction against the partition. Begin(context.Context, BeginOptions) (Transaction, error) // Close closes the partition handle to signal the caller is done using it. Close() + // GetLogManager provides controlled access to underlying log management system for log consumption purpose. It + // allows the consumers to access to on-disk location of a LSN and acknowledge consumed position. + GetLogManager() LogManager } // TransactionOptions are used to pass transaction options into Begin. diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index e2fabf936fd5cb62dac94ea20439e5ca79648391..7f85ce20aa8ffb91e6367d069b8e5c9bdbc2dc24 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -12,25 +12,12 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) -// LogConsumer is the interface of a log consumer that is passed to a TransactionManager. -// The LogConsumer may perform read-only operations against the on-disk log entry. -// The TransactionManager notifies the consumer of new transactions by invoking the -// NotifyNewTransaction method after they are committed. -type LogConsumer interface { - // NotifyNewTransactions alerts the LogConsumer that new log entries are available for - // consumption. The method invoked both when the TransactionManager - // initializes and when new transactions are committed. Both the low and high water mark - // LSNs are sent so that a newly initialized consumer is aware of the full range of - // entries it can process. - NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) -} - // Factory is factory type that can create new partitions. type Factory struct { cmdFactory gitcmd.CommandFactory repoFactory localrepo.Factory metrics Metrics - logConsumer LogConsumer + logConsumer storage.LogConsumer } // New returns a new Partition instance. @@ -75,7 +62,7 @@ func NewFactory( cmdFactory gitcmd.CommandFactory, repoFactory localrepo.Factory, metrics Metrics, - logConsumer LogConsumer, + logConsumer storage.LogConsumer, ) Factory { return Factory{ cmdFactory: cmdFactory, diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go new file mode 100644 index 0000000000000000000000000000000000000000..dbae71e2eed49afa873dde6775ef38b88c35ffc9 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -0,0 +1,367 @@ +package log + +import ( + "context" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "runtime/trace" + "sync" + "sync/atomic" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/safe" +) + +// StatePath returns the WAL directory's path. +func StatePath(stateDir string) string { + return filepath.Join(stateDir, "wal") +} + +// EntryPath returns an absolute path to a given log entry's WAL files. +func EntryPath(stateDir string, lsn storage.LSN) string { + return filepath.Join(StatePath(stateDir), lsn.String()) +} + +type positionType int + +const ( + // appliedPosition keeps track of latest applied position. WAL could not prune a log entry if it has not been applied. + appliedPosition positionType = iota + 1 + // consumerPosition keeps track of the latest consumer acknowledgement. + consumerPosition +) + +// position tracks the last LSN acknowledged for of a particular type. +type position struct { + lsn atomic.Value +} + +func newPosition() *position { + p := position{} + p.setPosition(0) + return &p +} + +func (p *position) getPosition() storage.LSN { + return p.lsn.Load().(storage.LSN) +} + +func (p *position) setPosition(pos storage.LSN) { + p.lsn.Store(pos) +} + +type testLogHooks struct { + BeforeAppendLogEntry func() +} + +// Manager is responsible for managing the Write-Ahead Log (WAL) entries on disk. It maintains the in-memory state +// and indexing system that reflect the functional state of the WAL. The Manager ensures safe and consistent +// proposals, applications, and prunings of log entries, acting as the interface for transactional log operations. It +// coordinates with LogConsumer to allow safe consumption of log entries while handling retention and cleanup based on +// references and acknowledgements. It effectively abstracts WAL operations from the TransactionManager, contributing to +// a cleaner separation of concerns and making the system more maintainable and extensible. +type Manager struct { + // mutex protects access to critical states, especially `oldestLSN` and `appendedLSN`, as well as the integrity + // of inflight log entries. Since indices are monotonic, two parallel log appending operations result in pushing + // files into the same directory and breaking the manifest file. Thus, Parallel log entry appending and pruning + // are not supported. + mutex sync.Mutex + + // storageName is the name of the storage the Manager's partition is a member of. + storageName string + // storage.PartitionID is the ID of the partition this manager is operating on. + partitionID storage.PartitionID + + // tmpDirectory is the directory storing temporary data. One example is log entry deletion. WAL moves a log + // entry to this dir before removing them completely. + tmpDirectory string + // stateDirectory is an absolute path to a directory where write-ahead log stores log entries + stateDirectory string + + // appendedLSN holds the LSN of the last log entry appended to the partition's write-ahead log. + appendedLSN storage.LSN + // oldestLSN holds the LSN of the head of log entries which is still kept in the database. The manager keeps + // them because they are still referred by a transaction. + oldestLSN storage.LSN + + // consumer is an external caller that may perform read-only operations against applied log entries. Log entries + // are retained until the consumer has acknowledged past their LSN. + consumer storage.LogConsumer + // positions tracks positions of log entries being used externally. Those positions are tracked so that WAL + positions map[positionType]*position + + // notifyQueue is a queue notifying when there is a new change. + notifyQueue chan struct{} + + // TestHooks are used in the tests to trigger logic at certain points in the execution. + // They are used to synchronize more complex test scenarios. Not used in production. + TestHooks testLogHooks +} + +// NewManager returns an instance of Manager. +func NewManager(storageName string, partitionID storage.PartitionID, stagingDirectory string, stateDirectory string, consumer storage.LogConsumer) *Manager { + positions := map[positionType]*position{ + appliedPosition: newPosition(), + } + if consumer != nil { + positions[consumerPosition] = newPosition() + } + return &Manager{ + storageName: storageName, + partitionID: partitionID, + tmpDirectory: stagingDirectory, + stateDirectory: stateDirectory, + consumer: consumer, + positions: positions, + notifyQueue: make(chan struct{}, 1), + TestHooks: testLogHooks{ + BeforeAppendLogEntry: func() {}, + }, + } +} + +// Initialize sets up the initial state of the Manager, preparing it to manage the write-ahead log entries. It reads +// the last applied LSN from the database to resume from where it left off, creates necessary directories, and +// initializes in-memory tracking variables such as appendedLSN and oldestLSN based on the files present in the WAL +// directory. This method also removes any stale log files that may have been left due to interrupted operations, +// ensuring the WAL directory only contains valid log entries. If a LogConsumer is present, it notifies it of the +// initial log entry state, enabling consumers to start processing from the correct point. Proper initialization is +// crucial for maintaining data consistency and ensuring that log entries are managed accurately upon system startup. +func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) error { + if err := mgr.createStateDirectory(ctx); err != nil { + return fmt.Errorf("create state directory: %w", err) + } + + // The LSN of the last appended log entry is determined from the LSN of the latest entry in the log and the + // latest applied log entry. As a log entry could be used by consumers (such as log consumer for backup) it + // needs to be reserved them until it is not used by any components. + // oldestLSN is initialized to appliedLSN + 1. If there are no log entries in the log, then everything has been + // pruned already or there has not been any log entries yet. Setting this +1 avoids trying to clean up log entries + // that do not exist. If there are some, we'll set oldestLSN to the head of the log below. + mgr.oldestLSN = appliedLSN + 1 + // appendedLSN is initialized to appliedLSN. If there are no log entries, then there has been no transaction yet, or + // all log entries have been applied and have been already pruned. If there are some in the log, we'll update this + // below to match. + mgr.appendedLSN = appliedLSN + + if logEntries, err := os.ReadDir(StatePath(mgr.stateDirectory)); err != nil { + return fmt.Errorf("read wal directory: %w", err) + } else if len(logEntries) > 0 { + if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { + return fmt.Errorf("parse oldest LSN: %w", err) + } + if mgr.appendedLSN, err = storage.ParseLSN(logEntries[len(logEntries)-1].Name()); err != nil { + return fmt.Errorf("parse appended LSN: %w", err) + } + } + + if mgr.consumer != nil && mgr.appendedLSN != 0 { + // Set acknowledged position to oldestLSN - 1 and notify the consumer from oldestLSN -> appendedLSN. + // If set the position to 0, the consumer is unable to read pruned entry anyway. + mgr.AcknowledgeConsumerPosition(mgr.oldestLSN - 1) + mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) + } + mgr.AcknowledgeAppliedPosition(appliedLSN) + + return nil +} + +// AcknowledgeAppliedPosition acknowledges the position of latest applied log entry. +func (mgr *Manager) AcknowledgeAppliedPosition(lsn storage.LSN) { + mgr.positions[appliedPosition].setPosition(lsn) +} + +// AcknowledgeConsumerPosition acknowledges log entries up and including LSN as successfully processed for the specified +// LogConsumer. The manager is awakened if it is currently awaiting a new or completed transaction. +func (mgr *Manager) AcknowledgeConsumerPosition(lsn storage.LSN) { + if mgr.consumer == nil { + panic("log manager's consumer must be present prior to AcknowledgeConsumerPos call") + } + mgr.positions[consumerPosition].setPosition(lsn) + + // Alert the outsider. If it has a pending acknowledgement already no action is required. + select { + case mgr.notifyQueue <- struct{}{}: + default: + } +} + +// StateDirectory returns the state directory under the management of this manager. +func (mgr *Manager) StateDirectory() string { + return mgr.stateDirectory +} + +// NotifyQueue returns a notify channel so that caller can poll new changes. +func (mgr *Manager) NotifyQueue() <-chan struct{} { + return mgr.notifyQueue +} + +// AppendLogEntry appends an entry to the write-ahead log. logEntryPath is an +// absolute path to the directory that represents the log entry. appendLogEntry +// moves the log entry's directory to the WAL, and returns its LSN once it has +// been committed to the log. +func (mgr *Manager) AppendLogEntry(ctx context.Context, logEntryPath string) (storage.LSN, error) { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + nextLSN := mgr.appendedLSN + 1 + mgr.TestHooks.BeforeAppendLogEntry() + + // Move the log entry from the staging directory into its place in the log. + destinationPath := mgr.GetEntryPath(nextLSN) + if err := os.Rename(logEntryPath, destinationPath); err != nil { + return 0, fmt.Errorf("move wal files: %w", err) + } + + // After this sync, the log entry has been persisted and will be recovered on failure. + if err := safe.NewSyncer().SyncParent(ctx, destinationPath); err != nil { + // If this fails, the log entry will be left in the write-ahead log but it is not + // properly persisted. If the fsync fails, something is seriously wrong and there's no + // point trying to delete the files. The right thing to do is to terminate Gitaly + // immediately as going further could cause data loss and corruption. This error check + // will later be replaced with a panic that terminates Gitaly. + // + // For more details, see: https://gitlab.com/gitlab-org/gitaly/-/issues/5774 + return 0, fmt.Errorf("sync log entry: %w", err) + } + + mgr.appendedLSN = nextLSN + + if mgr.consumer != nil { + mgr.consumer.NotifyNewEntries(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) + } + return nextLSN, nil +} + +func (mgr *Manager) createStateDirectory(ctx context.Context) error { + needsFsync := false + for _, path := range []string{ + mgr.stateDirectory, + filepath.Join(mgr.stateDirectory, "wal"), + } { + err := os.Mkdir(path, mode.Directory) + switch { + case errors.Is(err, fs.ErrExist): + continue + case err != nil: + return fmt.Errorf("mkdir: %w", err) + } + + // The directory was created so we need to fsync. + needsFsync = true + } + + // If the directories already existed and we didn't create them, don't fsync. + if !needsFsync { + return nil + } + + syncer := safe.NewSyncer() + if err := syncer.SyncRecursive(ctx, mgr.stateDirectory); err != nil { + return fmt.Errorf("sync: %w", err) + } + + if err := syncer.SyncParent(ctx, mgr.stateDirectory); err != nil { + return fmt.Errorf("sync parent: %w", err) + } + + return nil +} + +// PruneLogEntries prunes log entries from the Write-Ahead Log (WAL) that have been committed and are no longer +// needed. It ensures efficient storage management by removing redundant entries while maintaining the integrity of the +// log sequence. The method respects the established low-water mark, ensuring no entries that might still be required +// for transaction consistency are deleted. +func (mgr *Manager) PruneLogEntries(ctx context.Context) error { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + // When a log entry is applied, if there is any log in front of it which are still referred, we cannot delete + // it. This condition is to prevent a "hole" in the list. A transaction referring to a log entry at the + // low-water mark might scan all afterward log entries. Thus, the manager needs to keep in the database. + // + // ┌── Consumer not acknowledged + // │ ┌─ Applied til this point + // Can remove │ │ ┌─ Free, but cannot be removed + // ◄───────────► │ │ │ + // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ + // └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ + // └─ oldestLSN ▲ + // │ + // Low-water mark + // + for mgr.oldestLSN < mgr.lowWaterMark() { + if err := mgr.deleteLogEntry(ctx, mgr.oldestLSN); err != nil { + return fmt.Errorf("deleting log entry: %w", err) + } + mgr.oldestLSN++ + } + return nil +} + +// AppendedLSN returns the index of latest appended log entry. +func (mgr *Manager) AppendedLSN() storage.LSN { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + return mgr.appendedLSN +} + +// GetEntryPath returns the path of the log entry's root directory. +func (mgr *Manager) GetEntryPath(lsn storage.LSN) string { + return EntryPath(mgr.stateDirectory, lsn) +} + +// deleteLogEntry deletes the log entry at the given LSN from the log. +func (mgr *Manager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { + defer trace.StartRegion(ctx, "deleteLogEntry").End() + + tmpDir, err := os.MkdirTemp(mgr.tmpDirectory, "") + if err != nil { + return fmt.Errorf("mkdir temp: %w", err) + } + + logEntryPath := EntryPath(mgr.stateDirectory, lsn) + // We can't delete a directory atomically as we have to first delete all of its content. + // If the deletion was interrupted, we'd be left with a corrupted log entry on the disk. + // To perform the deletion atomically, we move the to be deleted log entry out from the + // log into a temporary directory and sync the move. After that, the log entry is no longer + // in the log, and we can delete the files without having to worry about the deletion being + // interrupted and being left with a corrupted log entry. + if err := os.Rename(logEntryPath, filepath.Join(tmpDir, "to_delete")); err != nil { + return fmt.Errorf("rename: %w", err) + } + + if err := safe.NewSyncer().SyncParent(ctx, logEntryPath); err != nil { + return fmt.Errorf("sync file deletion: %w", err) + } + + // With the log entry removed from the log, we can now delete the files. There's no need + // to sync the deletions as the log entry is a temporary directory that will be removed + // on start up if they are left around from a crash. + if err := os.RemoveAll(tmpDir); err != nil { + return fmt.Errorf("remove files: %w", err) + } + + return nil +} + +// lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than +// this mark are removed. +func (mgr *Manager) lowWaterMark() storage.LSN { + minAcknowledged := mgr.appendedLSN + 1 + + // Position is the last acknowledged LSN, this is eligible for pruning. + // lowWaterMark returns the lowest LSN that cannot be pruned, so add one. + for _, pos := range mgr.positions { + if pos.getPosition()+1 < minAcknowledged { + minAcknowledged = pos.getPosition() + 1 + } + } + + return minAcknowledged +} diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go new file mode 100644 index 0000000000000000000000000000000000000000..27ca31a57e1336e463bde10f952df8437ae89c94 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -0,0 +1,577 @@ +package log + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func appendLogEntry(t *testing.T, ctx context.Context, manager *Manager, files map[string][]byte) storage.LSN { + t.Helper() + + logEntryPath := testhelper.TempDir(t) + for name, value := range files { + path := filepath.Join(logEntryPath, name) + require.NoError(t, os.WriteFile(path, value, mode.File)) + } + + nextLSN, err := manager.AppendLogEntry(ctx, logEntryPath) + require.NoError(t, err) + + return nextLSN +} + +func setupLogManager(t *testing.T, ctx context.Context, consumer storage.LogConsumer) *Manager { + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), testhelper.TempDir(t), consumer) + require.NoError(t, logManager.Initialize(ctx, 0)) + + return logManager +} + +func TestLogManager_Initialize(t *testing.T) { + t.Parallel() + + t.Run("initial state without prior log entries", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + require.Equal(t, storage.LSN(0), logManager.appendedLSN) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("existing WAL entries without existing appliedLSN", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + require.Equal(t, storage.LSN(2), logManager.appendedLSN) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + t.Run("existing WAL entries with appliedLSN in-between", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.createStateDirectory(ctx)) + + for i := 0; i < 3; i++ { + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 2)) + + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + require.Equal(t, storage.LSN(3), logManager.appendedLSN) + require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + + t.Run("existing WAL entries with up-to-date appliedLSN", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + stateDir := testhelper.TempDir(t) + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 0)) + + for i := 0; i < 3; i++ { + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, nil) + require.NoError(t, logManager.Initialize(ctx, 3)) + + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + require.Equal(t, storage.LSN(3), logManager.appendedLSN) + require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) +} + +func TestLogManager_PruneLogEntries(t *testing.T) { + t.Parallel() + + t.Run("no entries to remove", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Expect no entries to be removed + require.NoError(t, logManager.PruneLogEntries(ctx)) + require.Equal(t, storage.LSN(1), logManager.oldestLSN) + + // Assert on-disk state + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("remove single applied entry", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Inject a single log entry + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + + // Set this entry as applied + logManager.AcknowledgeAppliedPosition(1) + + // Before removal + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + }) + + // Attempt to remove applied log entries + require.NoError(t, logManager.PruneLogEntries(ctx)) + + // After removal + require.Equal(t, storage.LSN(2), logManager.oldestLSN) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("retain entry due to low-water mark constraint", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, &mockLogConsumer{}) + + // Inject multiple log entries + for i := 0; i < 3; i++ { + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + // Set the applied LSN to 2 + logManager.AcknowledgeAppliedPosition(2) + // Manually set the consumer's position to the first entry, forcing low-water mark to retain it + logManager.AcknowledgeConsumerPosition(1) + + // Before removal + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + + require.NoError(t, logManager.PruneLogEntries(ctx)) + require.Equal(t, storage.LSN(2), logManager.oldestLSN) + + // Assert on-disk state to ensure no entries were removed + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + + t.Run("remove multiple applied entries", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + // Inject multiple log entries + for i := 0; i < 5; i++ { + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte(fmt.Sprintf("content-%d", i+1))}) + } + + // Set the applied LSN to 3, allowing the first three entries to be pruned + logManager.AcknowledgeAppliedPosition(3) + + // Before removal + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + "/wal/0000000000004": {Mode: mode.Directory}, + "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, + "/wal/0000000000005": {Mode: mode.Directory}, + "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, + }) + + require.NoError(t, logManager.PruneLogEntries(ctx)) + + // Ensure only entries starting from LSN 4 are retained + require.Equal(t, storage.LSN(4), logManager.oldestLSN) + + // Assert on-disk state after removals + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000004": {Mode: mode.Directory}, + "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, + "/wal/0000000000005": {Mode: mode.Directory}, + "/wal/0000000000005/1": {Mode: mode.File, Content: []byte("content-5")}, + }) + }) +} + +func TestLogManager_AppendLogEntry(t *testing.T) { + t.Parallel() + + t.Run("append a log entry with a single file", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + }) + }) + + t.Run("append a log entry with multiple files", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{ + "1": []byte("content-1"), + "2": []byte("content-2"), + "3": []byte("content-3"), + }) + + require.Equal(t, logManager.appendedLSN, storage.LSN(1)) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000001/2": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000001/3": {Mode: mode.File, Content: []byte("content-3")}, + }) + }) + + t.Run("append multiple entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + logManager := setupLogManager(t, ctx, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2-1"), "2": []byte("content-2-2")}) + appendLogEntry(t, ctx, logManager, nil) + + require.Equal(t, logManager.appendedLSN, storage.LSN(3)) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2-1")}, + "/wal/0000000000002/2": {Mode: mode.File, Content: []byte("content-2-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + }) + }) +} + +type mockLogConsumer struct { + mu sync.Mutex + positions [][]storage.LSN +} + +func (c *mockLogConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, oldestLSN, appendedLSN storage.LSN) { + c.mu.Lock() + defer c.mu.Unlock() + + c.positions = append(c.positions, []storage.LSN{oldestLSN, appendedLSN}) +} + +func TestLogManager_Positions(t *testing.T) { + ctx := testhelper.Context(t) + + simulatePositions := func(t *testing.T, logManager *Manager, consumed storage.LSN, applied storage.LSN) { + logManager.AcknowledgeConsumerPosition(consumed) + logManager.AcknowledgeAppliedPosition(applied) + require.NoError(t, logManager.PruneLogEntries(ctx)) + } + + t.Run("consumer pos is set to 0 after initialized", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + require.Equal(t, [][]storage.LSN(nil), mockConsumer.positions) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("notify consumer after restart", func(t *testing.T) { + stateDir := testhelper.TempDir(t) + + // Before restart + mockConsumer := &mockLogConsumer{} + + logManager := NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + require.NoError(t, logManager.Initialize(ctx, 0)) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + // Apply to 3 but consume to 1 + simulatePositions(t, logManager, 1, 2) + require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + + // Inject 3, 4 + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-4")}) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + "/wal/0000000000004": {Mode: mode.Directory}, + "/wal/0000000000004/1": {Mode: mode.File, Content: []byte("content-4")}, + }) + + // Restart the log consumer. + mockConsumer = &mockLogConsumer{} + logManager = NewManager("test-storage", 1, testhelper.TempDir(t), stateDir, mockConsumer) + require.NoError(t, logManager.Initialize(ctx, 2)) + + // Notify consumer to consume from 2 -> 4 + require.Equal(t, [][]storage.LSN{{2, 4}}, mockConsumer.positions) + + // Both consumer and applier catch up. + simulatePositions(t, logManager, 4, 4) + + // All log entries are pruned at this point. The consumer should not be notified again. + require.Equal(t, [][]storage.LSN{{2, 4}}, mockConsumer.positions) + require.Equal(t, storage.LSN(5), logManager.lowWaterMark()) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("unacknowledged entries are not pruned", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + simulatePositions(t, logManager, 0, 2) + + require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + t.Run("acknowledged entries got pruned", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + simulatePositions(t, logManager, 1, 2) + + require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + t.Run("entries consumed faster than applied", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + + simulatePositions(t, logManager, 2, 0) + + require.Equal(t, [][]storage.LSN{{1, 1}, {1, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(1), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte("content-1")}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + }) + }) + + t.Run("acknowledge entries one by one", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + simulatePositions(t, logManager, 1, 1) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + simulatePositions(t, logManager, 2, 2) + + // The oldest LSN changes after each acknowledgement + require.Equal(t, [][]storage.LSN{{1, 1}, {2, 2}}, mockConsumer.positions) + require.Equal(t, storage.LSN(3), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("append while consumer is busy with prior entries", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + simulatePositions(t, logManager, 0, 1) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + simulatePositions(t, logManager, 0, 2) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + simulatePositions(t, logManager, 3, 3) + + require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) + + t.Run("acknowledged entries not pruned if not applied", func(t *testing.T) { + mockConsumer := &mockLogConsumer{} + logManager := setupLogManager(t, ctx, mockConsumer) + + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-2")}) + appendLogEntry(t, ctx, logManager, map[string][]byte{"1": []byte("content-3")}) + + // 2 and 3 are not applied, hence kept intact. + simulatePositions(t, logManager, 3, 1) + + require.Equal(t, storage.LSN(2), logManager.lowWaterMark()) + + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000002": {Mode: mode.Directory}, + "/wal/0000000000002/1": {Mode: mode.File, Content: []byte("content-2")}, + "/wal/0000000000003": {Mode: mode.Directory}, + "/wal/0000000000003/1": {Mode: mode.File, Content: []byte("content-3")}, + }) + + simulatePositions(t, logManager, 3, 3) + require.Equal(t, storage.LSN(4), logManager.lowWaterMark()) + testhelper.RequireDirectoryState(t, logManager.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + }) +} diff --git a/internal/gitaly/storage/storagemgr/partition/log/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/log/testhelper_test.go new file mode 100644 index 0000000000000000000000000000000000000000..62ab82a853622a44d518f803be31a1238c0520a5 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/log/testhelper_test.go @@ -0,0 +1,11 @@ +package log + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index e7ef57edc7dac0d791b28c7ed2f310cd2e36285a..39c0cfc437dfc7b3bfa3111552c27f8707ea83c7 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -713,7 +713,7 @@ type testTransactionSetup struct { NonExistentOID git.ObjectID Commits testTransactionCommits AnnotatedTags []testTransactionTag - Consumer LogConsumer + Consumer storage.LogConsumer } type testTransactionHooks struct { @@ -721,8 +721,6 @@ type testTransactionHooks struct { BeforeApplyLogEntry hookFunc // BeforeAppendLogEntry is called before a log entry is appended to the log. BeforeAppendLogEntry hookFunc - // AfterDeleteLogEntry is called after a log entry is deleted. - AfterDeleteLogEntry hookFunc // BeforeReadAppliedLSN is invoked before the applied LSN is read. BeforeReadAppliedLSN hookFunc // BeforeStoreAppliedLSN is invoked before the applied LSN is stored. @@ -938,7 +936,7 @@ type Prune struct { ExpectedObjects []git.ObjectID } -// ConsumerAcknowledge calls AcknowledgeTransaction for all consumers. +// ConsumerAcknowledge calls AcknowledgeConsumerPosition for all consumers. type ConsumerAcknowledge struct { // LSN is the LSN acknowledged by the consumers. LSN storage.LSN @@ -966,9 +964,6 @@ type StateAssertion struct { // Repositories is the expected state of the repositories in the storage. The key is // the repository's relative path and the value describes its expected state. Repositories RepositoryStates - // Consumers is the expected state of the consumers and their position as tracked by - // the TransactionManager. - Consumers ConsumerState } // AdhocAssertion allows a test to add some custom assertions apart from the built-in assertions above. @@ -995,36 +990,10 @@ type MockLogConsumer struct { highWaterMark storage.LSN } -func (lc *MockLogConsumer) NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) { +func (lc *MockLogConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) { lc.highWaterMark = highWaterMark } -// ConsumerState is used to track the log positions received by the consumer and the corresponding -// acknowledgements from the consumer to the manager. We deliberately do not track the LowWaterMark -// sent to consumers as this is non-deterministic. -type ConsumerState struct { - // ManagerPosition is the last acknowledged LSN for the consumer as tracked by the TransactionManager. - ManagerPosition storage.LSN - // HighWaterMark is the latest high water mark received by the consumer from NotifyNewTransactions. - HighWaterMark storage.LSN -} - -// RequireConsumer asserts the consumer log position is correct. -func RequireConsumer(t *testing.T, consumer LogConsumer, consumerPos *consumerPosition, expected ConsumerState) { - t.Helper() - - require.Equal(t, expected.ManagerPosition, consumerPos.getPosition(), "expected and actual manager position don't match") - - if consumer == nil { - return - } - - mock, ok := consumer.(*MockLogConsumer) - require.True(t, ok) - - require.Equal(t, expected.HighWaterMark, mock.highWaterMark, "expected and actual high water marks don't match") -} - // steps defines execution steps in a test. Each test case can define multiple steps to exercise // more complex behavior. type steps []any @@ -1455,7 +1424,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction := openTransactions[step.TransactionID] transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: - transactionManager.AcknowledgeTransaction(step.LSN) + transactionManager.logManager.AcknowledgeConsumerPosition(step.LSN) case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] @@ -1573,8 +1542,6 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } } - RequireConsumer(t, transactionManager.consumer, transactionManager.consumerPos, tc.expectedState.Consumers) - testhelper.RequireDirectoryState(t, stateDir, "", expectedDirectory) expectedStagingDirState := testhelper.DirectoryState{ diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 6d3d799007cda1d7db97b218b41f371b959123a3..6ef6c7be45b55462703e2e3a3493669df5301555 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -37,10 +37,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal/reftree" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" + logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/tracing" @@ -329,7 +330,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti txn := &Transaction{ write: opts.Write, commit: mgr.commit, - snapshotLSN: mgr.appendedLSN, + snapshotLSN: mgr.logManager.AppendedLSN(), finished: make(chan struct{}), relativePath: relativePath, metrics: mgr.metrics, @@ -370,9 +371,12 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti // entries. We signal only if the transaction modifies the in-memory committed entry. // This signal queue is buffered. If the queue is full, the manager hasn't woken up. The // next scan will cover the work of the prior one. So, no need to let the transaction wait. - // ┌─ 1st signal ┌─ The manager scans til here - // □ □ □ □ □ □ □ □ □ □ ■ ■ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ⧅ ⧅ ⧅ ⧅ ■ - // └─ 2nd signal + // + // ┌─ 1st signal ┌── The manager scans til here + // ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ + // └─┘ └─┘ └┬┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ + // └─ 2nd signal + // if removedAnyEntry { select { case mgr.completedQueue <- struct{}{}: @@ -408,7 +412,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti } if admitted { - // If the transcation was admitted, `.Run()` is responsible for cleaning the transaction up. + // If the transaction was admitted, `.Run()` is responsible for cleaning the transaction up. // Cleaning up the snapshots can take a relatively long time if the snapshots are large, or if // the file system is busy. To avoid blocking transaction processing, we us a pool of background // workers to clean up the transaction snapshots. @@ -876,6 +880,8 @@ type snapshotLock struct { // committedEntry is a wrapper for a log entry. It is used to keep track of entries in which their snapshots are still // accessed by other transactions. type committedEntry struct { + // entry is the in-memory reflection of referenced log entry. + entry *gitalypb.LogEntry // lsn is the associated LSN of the entry lsn storage.LSN // snapshotReaders accounts for the number of transaction readers of the snapshot. @@ -884,43 +890,10 @@ type committedEntry struct { objectDependencies map[git.ObjectID]struct{} } -// AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed -// for the specified LogConsumer. The manager is awakened if it is currently awaiting a new or -// completed transaction. -func (mgr *TransactionManager) AcknowledgeTransaction(lsn storage.LSN) { - mgr.consumerPos.setPosition(lsn) - - // Alert the manager. If it has a pending acknowledgement already no action is required. - select { - case mgr.acknowledgedQueue <- struct{}{}: - default: - } -} - -// GetTransactionPath returns the path of the log entry's root directory. -func (mgr *TransactionManager) GetTransactionPath(lsn storage.LSN) string { - return walFilesPathForLSN(mgr.stateDirectory, lsn) -} - -// consumerPosition tracks the last LSN acknowledged for a consumer. -type consumerPosition struct { - // position is the last LSN acknowledged as completed by the consumer. - position storage.LSN - sync.Mutex -} - -func (p *consumerPosition) getPosition() storage.LSN { - p.Lock() - defer p.Unlock() - - return p.position -} - -func (p *consumerPosition) setPosition(pos storage.LSN) { - p.Lock() - defer p.Unlock() - - p.position = pos +// GetLogManager provides controlled access to underlying log management system for log consumption purpose. It +// allows the consumers to access to on-disk location of a LSN and acknowledge consumed position. +func (mgr *TransactionManager) GetLogManager() storage.LogManager { + return mgr.logManager } // TransactionManager is responsible for transaction management of a single repository. Each repository has @@ -966,7 +939,7 @@ type TransactionManager struct { // close cancels ctx and stops the transaction processing. close context.CancelFunc // logger is the logger to use to write log messages. - logger log.Logger + logger logging.Logger // closing is closed when close is called. It unblock transactions that are waiting to be admitted. closing <-chan struct{} @@ -974,9 +947,6 @@ type TransactionManager struct { // being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly // releases awaiters when the transactions processing is stopped. closed chan struct{} - // stateDirectory is an absolute path to a directory where the TransactionManager stores the state related to its - // write-ahead log. - stateDirectory string // stagingDirectory is a path to a directory where this TransactionManager should stage the files of the transactions // before it logs them. The TransactionManager cleans up the files during runtime but stale files may be // left around after crashes. The files are temporary and any leftover files are expected to be cleaned up when @@ -995,6 +965,8 @@ type TransactionManager struct { partitionID storage.PartitionID // db is the handle to the key-value store used for storing the write-ahead log related state. db keyvalue.Transactioner + // logManager manages the underlying Write-Ahead Log entries. + logManager *log.Manager // admissionQueue is where the incoming writes are waiting to be admitted to the transaction // manager. admissionQueue chan *Transaction @@ -1029,14 +1001,8 @@ type TransactionManager struct { // conflictMgr is responsible for checking concurrent transactions against each other for conflicts. conflictMgr *conflict.Manager - // appendedLSN holds the LSN of the last log entry appended to the partition's write-ahead log. - appendedLSN storage.LSN // appliedLSN holds the LSN of the last log entry applied to the partition. appliedLSN storage.LSN - // oldestLSN holds the LSN of the head of log entries which is still kept in the database. The manager keeps - // them because they are still referred by a transaction. - oldestLSN storage.LSN - // awaitingTransactions contains transactions waiting for their log entry to be applied to // the partition. It's keyed by the LSN the transaction is waiting to be applied and the // value is the resultChannel that is waiting the result. @@ -1048,14 +1014,6 @@ type TransactionManager struct { // the corresponding snapshots. committedEntries *list.List - // consumer is an the external caller that may perform read-only operations against applied - // log entries. Log entries are retained until the consumer has acknowledged past their LSN. - consumer LogConsumer - // consumerPos tracks the largest LSN that has been acknowledged by consumer. - consumerPos *consumerPosition - // acknowledgedQueue is a queue notifying when a transaction has been acknowledged. - acknowledgedQueue chan struct{} - // testHooks are used in the tests to trigger logic at certain points in the execution. // They are used to synchronize more complex test scenarios. Not used in production. testHooks testHooks @@ -1065,18 +1023,16 @@ type TransactionManager struct { } type testHooks struct { - beforeInitialization func() - beforeAppendLogEntry func() - beforeApplyLogEntry func() - beforeStoreAppliedLSN func() - beforeDeleteLogEntryFiles func() - beforeRunExiting func() + beforeInitialization func() + beforeApplyLogEntry func() + beforeStoreAppliedLSN func() + beforeRunExiting func() } // NewTransactionManager returns a new TransactionManager for the given repository. func NewTransactionManager( ptnID storage.PartitionID, - logger log.Logger, + logger logging.Logger, db keyvalue.Transactioner, storageName, storagePath, @@ -1085,12 +1041,10 @@ func NewTransactionManager( cmdFactory gitcmd.CommandFactory, repositoryFactory localrepo.StorageScopedFactory, metrics ManagerMetrics, - consumer LogConsumer, + consumer storage.LogConsumer, ) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) - consumerPos := &consumerPosition{} - cleanupWorkers := &errgroup.Group{} cleanupWorkers.SetLimit(25) @@ -1106,29 +1060,24 @@ func NewTransactionManager( storagePath: storagePath, partitionID: ptnID, db: db, + logManager: log.NewManager(storageName, ptnID, stagingDir, stateDir, consumer), admissionQueue: make(chan *Transaction), completedQueue: make(chan struct{}, 1), initialized: make(chan struct{}), snapshotLocks: make(map[storage.LSN]*snapshotLock), conflictMgr: conflict.NewManager(), - stateDirectory: stateDir, stagingDirectory: stagingDir, cleanupWorkers: cleanupWorkers, cleanupWorkerFailed: make(chan struct{}), awaitingTransactions: make(map[storage.LSN]resultChannel), committedEntries: list.New(), metrics: metrics, - consumer: consumer, - consumerPos: consumerPos, - acknowledgedQueue: make(chan struct{}, 1), testHooks: testHooks{ - beforeInitialization: func() {}, - beforeAppendLogEntry: func() {}, - beforeApplyLogEntry: func() {}, - beforeStoreAppliedLSN: func() {}, - beforeDeleteLogEntryFiles: func() {}, - beforeRunExiting: func() {}, + beforeInitialization: func() {}, + beforeApplyLogEntry: func() {}, + beforeStoreAppliedLSN: func() {}, + beforeRunExiting: func() {}, }, } } @@ -1794,7 +1743,7 @@ func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, tran Name: "pack-refs", // By using the '--auto' flag, we ensure that git uses the best heuristic // for compaction. For reftables, it currently uses a geometric progression. - // This ensures we don't keep compacting unecessarily to a single file. + // This ensures we don't keep compacting unnecessarily to a single file. Flags: []gitcmd.Option{gitcmd.Flag{Name: "--auto"}}, }, gitcmd.WithStderr(&stderr)); err != nil { return structerr.New("exec pack-refs: %w", err).WithMetadata("stderr", stderr.String()) @@ -2274,38 +2223,16 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { } for { - if mgr.appliedLSN < mgr.appendedLSN { + if mgr.appliedLSN < mgr.logManager.AppendedLSN() { lsn := mgr.appliedLSN + 1 - if err := mgr.applyLogEntry(ctx, lsn); err != nil { return fmt.Errorf("apply log entry: %w", err) } - continue } - // When a log entry is applied, if there is any log in front of it which are still referred, we cannot delete - // it. This condition is to prevent a "hole" in the list. A transaction referring to a log entry at the - // low-water mark might scan all afterward log entries. Thus, the manager needs to keep in the database. - // - // ┌─ Oldest LSN - // ┌─ Can be removed ─┐ ┌─ Cannot be removed - // □ □ □ □ □ □ □ □ □ □ ■ ■ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ⧅ ⧅ ⧅ ⧅ ■ - // └─ Low-water mark, still referred by another transaction - if mgr.oldestLSN < mgr.lowWaterMark() { - if err := mgr.deleteLogEntry(ctx, mgr.oldestLSN); err != nil { - return fmt.Errorf("deleting log entry: %w", err) - } - - // The WAL entries are deleted only after there are no transactions using an - // older read snapshot than the LSN. It's also safe to drop the transaction - // from the conflict detection history as there are no transactions reading - // at an older snapshot. Since the changes are already in the transaction's - // snapshot, it would already base its changes on them. - mgr.conflictMgr.EvictLSN(ctx, mgr.oldestLSN) - - mgr.oldestLSN++ - continue + if err := mgr.logManager.PruneLogEntries(mgr.ctx); err != nil { + return fmt.Errorf("pruning log entries: %w", err) } if err := mgr.processTransaction(ctx); err != nil { @@ -2335,7 +2262,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned return errors.New("cleanup worker failed") case <-mgr.completedQueue: return nil - case <-mgr.acknowledgedQueue: + case <-mgr.logManager.NotifyQueue(): return nil case <-ctx.Done(): } @@ -2394,6 +2321,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned // Prepare the transaction to conflict check it. We'll commit it later if we // succeed logging the transaction. + mgr.mutex.Lock() preparedTX, err := mgr.conflictMgr.Prepare(ctx, &conflict.Transaction{ ReadLSN: transaction.SnapshotLSN(), TargetRelativePath: transaction.relativePath, @@ -2401,6 +2329,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned ZeroOID: zeroOID, ReferenceUpdates: transaction.referenceUpdates, }) + mgr.mutex.Unlock() if err != nil { return fmt.Errorf("prepare: %w", err) } @@ -2461,7 +2390,9 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned } // Commit the prepared transaction now that we've managed to commit the log entry. - preparedTX.Commit(ctx, mgr.appendedLSN) + mgr.mutex.Lock() + preparedTX.Commit(ctx, mgr.logManager.AppendedLSN()) + mgr.mutex.Unlock() return nil }(); err != nil { @@ -2469,7 +2400,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned return nil } - mgr.awaitingTransactions[mgr.appendedLSN] = transaction.result + mgr.awaitingTransactions[mgr.logManager.AppendedLSN()] = transaction.result return nil } @@ -2585,8 +2516,9 @@ func (mgr *TransactionManager) snapshotsDir() string { return filepath.Join(mgr.stagingDirectory, "snapshots") } -// initialize initializes the TransactionManager's state from the database. It loads the appended and the applied -// LSNs and initializes the notification channels that synchronize transaction beginning with log entry applying. +// initialize initializes the TransactionManager's state from the database. It initializes WAL log manager and the +// applied LSNs and initializes the notification channels that synchronize transaction beginning with log entry +// applying. func (mgr *TransactionManager) initialize(ctx context.Context) error { defer trace.StartRegion(ctx, "initialize").End() @@ -2599,8 +2531,8 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { mgr.appliedLSN = storage.LSN(appliedLSN.GetValue()) - if err := mgr.createStateDirectory(ctx); err != nil { - return fmt.Errorf("create state directory: %w", err) + if err := mgr.logManager.Initialize(ctx, mgr.appliedLSN); err != nil { + return fmt.Errorf("initialize log management: %w", err) } if err := os.Mkdir(mgr.snapshotsDir(), mode.Directory); err != nil { @@ -2612,40 +2544,6 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return fmt.Errorf("new snapshot manager: %w", err) } - // The LSN of the last appended log entry is determined from the LSN of the latest entry in the log and - // the latest applied log entry. The manager also keeps track of committed entries and reserves them until there - // is no transaction refers them. It's possible there are some left-over entries in the database because a - // transaction can hold the entry stubbornly. So, the manager could not clean them up in the last session. - // - // ┌─ oldestLSN ┌─ appendedLSN - // ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ - // └─ appliedLSN - // - // - // oldestLSN is initialized to appliedLSN + 1. If there are no log entries in the log, then everything has been - // pruned already or there has not been any log entries yet. Setting this +1 avoids trying to clean up log entries - // that do not exist. If there are some, we'll set oldestLSN to the head of the log below. - mgr.oldestLSN = mgr.appliedLSN + 1 - // appendedLSN is initialized to appliedLSN. If there are no log entries, then there has been no transaction yet, or - // all log entries have been applied and have been already pruned. If there are some in the log, we'll update this - // below to match. - mgr.appendedLSN = mgr.appliedLSN - - if logEntries, err := os.ReadDir(walFilesPath(mgr.stateDirectory)); err != nil { - return fmt.Errorf("read wal directory: %w", err) - } else if len(logEntries) > 0 { - if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { - return fmt.Errorf("parse oldest LSN: %w", err) - } - if mgr.appendedLSN, err = storage.ParseLSN(logEntries[len(logEntries)-1].Name()); err != nil { - return fmt.Errorf("parse appended LSN: %w", err) - } - } - - if mgr.consumer != nil { - mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) - } - // Create a snapshot lock for the applied LSN as it is used for synchronizing // the snapshotters with the log application. mgr.snapshotLocks[mgr.appliedLSN] = &snapshotLock{applied: make(chan struct{})} @@ -2653,14 +2551,10 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { // Each unapplied log entry should have a snapshot lock as they are created in normal // operation when committing a log entry. Recover these entries. - for i := mgr.appliedLSN + 1; i <= mgr.appendedLSN; i++ { + for i := mgr.appliedLSN + 1; i <= mgr.logManager.AppendedLSN(); i++ { mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})} } - if err := mgr.removeStaleWALFiles(ctx, mgr.oldestLSN, mgr.appendedLSN); err != nil { - return fmt.Errorf("remove stale packs: %w", err) - } - mgr.testHooks.beforeInitialization() mgr.initializationSuccessful = true @@ -2687,98 +2581,11 @@ func (mgr *TransactionManager) doesRepositoryExist(ctx context.Context, relative return true, nil } -func (mgr *TransactionManager) createStateDirectory(ctx context.Context) error { - needsFsync := false - for _, path := range []string{ - mgr.stateDirectory, - filepath.Join(mgr.stateDirectory, "wal"), - } { - if err := os.Mkdir(path, mode.Directory); err != nil { - if !errors.Is(err, fs.ErrExist) { - return fmt.Errorf("mkdir: %w", err) - } - - continue - } - - // The directory was created so we need to fsync. - needsFsync = true - } - - // If the directories already existed and we didn't create them, don't fsync. - if !needsFsync { - return nil - } - - syncer := safe.NewSyncer() - if err := syncer.SyncRecursive(ctx, mgr.stateDirectory); err != nil { - return fmt.Errorf("sync: %w", err) - } - - if err := syncer.SyncParent(ctx, mgr.stateDirectory); err != nil { - return fmt.Errorf("sync parent: %w", err) - } - - return nil -} - // getAbsolutePath returns the relative path's absolute path in the storage. func (mgr *TransactionManager) getAbsolutePath(relativePath ...string) string { return filepath.Join(append([]string{mgr.storagePath}, relativePath...)...) } -// removeStaleWALFiles removes files from the log directory that have no associated log entry. -// Such files can be left around if transaction's files were moved in place successfully -// but the manager was interrupted before successfully persisting the log entry itself. -// If the manager deletes a log entry successfully from the database but is interrupted before it cleans -// up the associated files, such a directory can also be left at the head of the log. -func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, oldestLSN, appendedLSN storage.LSN) error { - needsFsync := false - for _, possibleStaleFilesPath := range []string{ - // Log entries are pruned one by one. If a write is interrupted, the only possible stale files would be - // for the log entry preceding the oldest log entry. - walFilesPathForLSN(mgr.stateDirectory, oldestLSN-1), - // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale - // files would be for the next LSN. Remove the files if they exist. - walFilesPathForLSN(mgr.stateDirectory, appendedLSN+1), - } { - - if _, err := os.Stat(possibleStaleFilesPath); err != nil { - if !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("stat: %w", err) - } - - // No stale files were present. - continue - } - - if err := os.RemoveAll(possibleStaleFilesPath); err != nil { - return fmt.Errorf("remove all: %w", err) - } - - needsFsync = true - } - - if needsFsync { - // Sync the parent directory to flush the file deletion. - if err := safe.NewSyncer().Sync(ctx, walFilesPath(mgr.stateDirectory)); err != nil { - return fmt.Errorf("sync: %w", err) - } - } - - return nil -} - -// walFilesPath returns the WAL directory's path. -func walFilesPath(stateDir string) string { - return filepath.Join(stateDir, "wal") -} - -// walFilesPathForLSN returns an absolute path to a given log entry's WAL files. -func walFilesPathForLSN(stateDir string, lsn storage.LSN) string { - return filepath.Join(walFilesPath(stateDir), lsn.String()) -} - // manifestPath returns the manifest file's path in the log entry. func manifestPath(logEntryPath string) string { return filepath.Join(logEntryPath, "MANIFEST") @@ -2941,7 +2748,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // to transaction operations. // // To ensure that we don't modify existing tables and autocompact, we lock the existing tables -// before applying the updates. This way the reftable backend willl only create new tables +// before applying the updates. This way the reftable backend will only create new tables func (mgr *TransactionManager) verifyReferencesWithGitForReftables( ctx context.Context, referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, @@ -3606,9 +3413,8 @@ func (mgr *TransactionManager) applyReferenceTransaction(ctx context.Context, ch return nil } -// appendLogEntry appends the transaction to the write-ahead log. It first writes the transaction's manifest file -// into the log entry's directory. Afterwards it moves the log entry's directory from the staging area to its final -// place in the write-ahead log. +// appendLogEntry appends a log etnry of a transaction to the write-ahead log. After the log entry is appended to WAL, +// the corresponding snapshot lock and in-memory reference for the latest appended LSN is created. func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { defer trace.StartRegion(ctx, "appendLogEntry").End() @@ -3638,45 +3444,29 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende return fmt.Errorf("synchronizing WAL directory: %w", err) } - mgr.testHooks.beforeAppendLogEntry() - - nextLSN := mgr.appendedLSN + 1 - // Move the log entry from the staging directory into its place in the log. - destinationPath := walFilesPathForLSN(mgr.stateDirectory, nextLSN) - if err := os.Rename(logEntryPath, destinationPath); err != nil { - return fmt.Errorf("move wal files: %w", err) - } - - // Sync the WAL directory. The manifest has been synced above, and all of the other files - // have been synced before queuing for commit. At this point we just have to sync the - // directory entry of the new log entry in the WAL directory to finalize the commit. - // - // After this sync, the log entry has been persisted and will be recovered on failure. - if err := safe.NewSyncer().SyncParent(ctx, destinationPath); err != nil { - // If this fails, the log entry will be left in the write-ahead log but it is not - // properly persisted. If the fsync fails, something is seriously wrong and there's no - // point trying to delete the files. The right thing to do is to terminate Gitaly - // immediately as going further could cause data loss and corruption. This error check - // will later be replaced with a panic that terminates Gitaly. - // - // For more details, see: https://gitlab.com/gitlab-org/gitaly/-/issues/5774 - return fmt.Errorf("sync log entry: %w", err) - } + // Pre-setup an snapshot lock entry for the assumed appended LSN location. + mgr.mutex.Lock() + mgr.snapshotLocks[mgr.logManager.AppendedLSN()+1] = &snapshotLock{applied: make(chan struct{})} + mgr.mutex.Unlock() // After this latch block, the transaction is committed and all subsequent transactions // are guaranteed to read it. + appendedLSN, err := mgr.logManager.AppendLogEntry(ctx, logEntryPath) + if err != nil { + mgr.mutex.Lock() + delete(mgr.snapshotLocks, mgr.logManager.AppendedLSN()+1) + mgr.mutex.Unlock() + return fmt.Errorf("append log entry: %w", err) + } + mgr.mutex.Lock() - mgr.appendedLSN = nextLSN - mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})} mgr.committedEntries.PushBack(&committedEntry{ - lsn: nextLSN, + lsn: appendedLSN, + entry: logEntry, objectDependencies: objectDependencies, }) mgr.mutex.Unlock() - if mgr.consumer != nil { - mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) - } return nil } @@ -3703,7 +3493,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS mgr.testHooks.beforeApplyLogEntry() if err := mgr.db.Update(func(tx keyvalue.ReadWriter) error { - if err := applyOperations(ctx, safe.NewSyncer().Sync, mgr.storagePath, walFilesPathForLSN(mgr.stateDirectory, lsn), logEntry.GetOperations(), tx); err != nil { + if err := applyOperations(ctx, safe.NewSyncer().Sync, mgr.storagePath, mgr.logManager.GetEntryPath(lsn), logEntry.GetOperations(), tx); err != nil { return fmt.Errorf("apply operations: %w", err) } @@ -3712,12 +3502,9 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS return fmt.Errorf("update: %w", err) } - mgr.testHooks.beforeStoreAppliedLSN() if err := mgr.storeAppliedLSN(lsn); err != nil { return fmt.Errorf("set applied LSN: %w", err) } - - mgr.appliedLSN = lsn mgr.snapshotManager.SetLSN(lsn) // There is no awaiter for a transaction if the transaction manager is recovering @@ -3762,43 +3549,9 @@ func (mgr *TransactionManager) createRepository(ctx context.Context, repositoryP return nil } -// deleteLogEntry deletes the log entry at the given LSN from the log. -func (mgr *TransactionManager) deleteLogEntry(ctx context.Context, lsn storage.LSN) error { - defer trace.StartRegion(ctx, "deleteLogEntry").End() - - tmpDir, err := os.MkdirTemp(mgr.stagingDirectory, "") - if err != nil { - return fmt.Errorf("mkdir temp: %w", err) - } - - logEntryPath := walFilesPathForLSN(mgr.stateDirectory, lsn) - // We can't delete a directory atomically as we have to first delete all of its content. - // If the deletion was interrupted, we'd be left with a corrupted log entry on the disk. - // To perform the deletion atomically, we move the to be deleted log entry out from the - // log into a temporary directory and sync the move. After that, the log entry is no longer - // in the log, and we can delete the files without having to worry about the deletion being - // interrupted and being left with a corrupted log entry. - if err := os.Rename(logEntryPath, filepath.Join(tmpDir, "to_delete")); err != nil { - return fmt.Errorf("rename: %w", err) - } - - if err := safe.NewSyncer().SyncParent(ctx, logEntryPath); err != nil { - return fmt.Errorf("sync file deletion: %w", err) - } - - // With the log entry removed from the log, we can now delete the files. There's no need - // to sync the deletions as the log entry is a temporary directory that will be removed - // on start up if they are left around from a crash. - if err := os.RemoveAll(tmpDir); err != nil { - return fmt.Errorf("remove files: %w", err) - } - - return nil -} - // readLogEntry returns the log entry from the given position in the log. func (mgr *TransactionManager) readLogEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { - manifestBytes, err := os.ReadFile(manifestPath(walFilesPathForLSN(mgr.stateDirectory, lsn))) + manifestBytes, err := os.ReadFile(manifestPath(mgr.logManager.GetEntryPath(lsn))) if err != nil { return nil, fmt.Errorf("read manifest: %w", err) } @@ -3813,7 +3566,14 @@ func (mgr *TransactionManager) readLogEntry(lsn storage.LSN) (*gitalypb.LogEntry // storeAppliedLSN stores the partition's applied LSN in the database. func (mgr *TransactionManager) storeAppliedLSN(lsn storage.LSN) error { - return mgr.setKey(keyAppliedLSN, lsn.ToProto()) + mgr.testHooks.beforeStoreAppliedLSN() + + if err := mgr.setKey(keyAppliedLSN, lsn.ToProto()); err != nil { + return err + } + mgr.logManager.AcknowledgeAppliedPosition(lsn) + mgr.appliedLSN = lsn + return nil } // setKey marshals and stores a given protocol buffer message into the database under the given key. @@ -3846,37 +3606,6 @@ func (mgr *TransactionManager) readKey(key []byte, destination proto.Message) er }) } -// lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than -// this mark are removed. -func (mgr *TransactionManager) lowWaterMark() storage.LSN { - mgr.mutex.Lock() - defer mgr.mutex.Unlock() - - // Greater than the maximum position of any consumer. - minConsumed := mgr.appliedLSN + 1 - - if mgr.consumer != nil { - // Position is the last acknowledged LSN, this is eligible for pruning. - // lowWaterMark returns the lowest LSN that cannot be pruned, so add one. - pos := mgr.consumerPos.getPosition() + 1 - if pos < minConsumed { - minConsumed = pos - } - } - - elm := mgr.committedEntries.Front() - if elm == nil { - return minConsumed - } - - committed := elm.Value.(*committedEntry).lsn - if minConsumed < committed { - return minConsumed - } - - return committed -} - // updateCommittedEntry updates the reader counter of the committed entry of the snapshot that this transaction depends on. func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN storage.LSN) *committedEntry { // Since the goroutine doing this is holding the lock, the snapshotLSN shouldn't change and no new transactions @@ -3906,17 +3635,17 @@ func (mgr *TransactionManager) walkCommittedEntries(transaction *Transaction, ca if committed.lsn <= transaction.snapshotLSN { continue } - entry, err := mgr.readLogEntry(committed.lsn) - if err != nil { + + if committed.entry == nil { return errCommittedEntryGone } // Transaction manager works on the partition level, including a repository and all of its pool // member repositories (if any). We need to filter log entries of the repository this // transaction targets. - if entry.GetRelativePath() != transaction.relativePath { + if committed.entry.GetRelativePath() != transaction.relativePath { continue } - if err := callback(entry, committed.objectDependencies); err != nil { + if err := callback(committed.entry, committed.objectDependencies); err != nil { return fmt.Errorf("callback: %w", err) } } @@ -3940,6 +3669,12 @@ func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) bool { } mgr.committedEntries.Remove(elm) + + // It's safe to drop the transaction from the conflict detection history as there are no transactions + // reading at an older snapshot. Since the changes are already in the transaction's snapshot, it would + // already base its changes on them. + mgr.conflictMgr.EvictLSN(mgr.ctx, front.lsn) + removedAnyEntry = true elm = mgr.committedEntries.Front() } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go index 0f3004e3a39240446c3281f8ed3c764a55e9ba6d..17029b01082b30c139adfb49a8fe8f2e070e59e1 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go @@ -89,10 +89,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 0, - HighWaterMark: 1, - }, }, }, { @@ -162,10 +158,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 1, - HighWaterMark: 1, - }, }, }, { @@ -256,14 +248,10 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 2, - HighWaterMark: 2, - }, }, }, { - desc: "dependent transaction blocks pruning acknowledged entry", + desc: "dependent transaction does not block pruning acknowledged entry", customSetup: customSetup, steps: steps{ StartManager{}, @@ -295,15 +283,10 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti Database: DatabaseState{ string(keyAppliedLSN): storage.LSN(1).ToProto(), }, - Directory: gittest.FilesOrReftables(testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - "/wal/0000000000001": {Mode: mode.Directory}, - "/wal/0000000000001/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/main", setup.Commits.First.OID)), - "/wal/0000000000001/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, - }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ - 1: {{"refs/heads/main": git.ReferenceUpdate{NewOID: setup.Commits.First.OID}}}, - })), + Directory: testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }, Repositories: RepositoryStates{ setup.RelativePath: { DefaultBranch: "refs/heads/main", @@ -344,14 +327,10 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 1, - HighWaterMark: 1, - }, }, }, { - desc: "consumer position zeroed lsn on restart", + desc: "consumer position set to oldest lsn on restart", customSetup: customSetup, steps: steps{ StartManager{}, @@ -408,19 +387,10 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti Database: DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), }, - Directory: gittest.FilesOrReftables(testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/other", setup.Commits.Second.OID)), - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte(setup.Commits.Second.OID + "\n")}, - "/wal/0000000000003": {Mode: mode.Directory}, - "/wal/0000000000003/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/third", setup.Commits.Third.OID)), - "/wal/0000000000003/1": {Mode: mode.File, Content: []byte(setup.Commits.Third.OID + "\n")}, - }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ - 2: {{"refs/heads/other": git.ReferenceUpdate{NewOID: setup.Commits.Second.OID}}}, - 3: {{"refs/heads/third": git.ReferenceUpdate{NewOID: setup.Commits.Third.OID}}}, - })), + Directory: testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }, Repositories: RepositoryStates{ setup.RelativePath: { DefaultBranch: "refs/heads/main", @@ -483,10 +453,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 0, - HighWaterMark: 3, - }, }, }, { @@ -562,10 +528,6 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ), }, }, - Consumers: ConsumerState{ - ManagerPosition: 1, - HighWaterMark: 1, - }, }, }, } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index 1b35818daeee8f2f5f4350006e14b8f107f269ef..d15322bf0556bea0bb77dd19324b12be0579bac8 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -25,11 +25,10 @@ type hookContext struct { // installHooks takes the hooks in the test setup and configures them in the TransactionManager. func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, hooks testTransactionHooks) { for destination, source := range map[*func()]hookFunc{ - &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, - &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, - &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, - &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, - &mgr.testHooks.beforeDeleteLogEntryFiles: hooks.AfterDeleteLogEntry, + &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, + &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, + &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, + &mgr.logManager.TestHooks.BeforeAppendLogEntry: hooks.BeforeAppendLogEntry, &mgr.testHooks.beforeRunExiting: func(hookContext) { if hooks.WaitForTransactionsWhenClosing { inflightTransactions.Wait() diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index edbe3e726543e7f050696fd18cce656db6c447ac..eccf30f3f62b45921ea637dc6ae62a2db162602c 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -29,6 +29,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/refdb" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -1401,7 +1402,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio // // The Manager starts up and we expect the pack file to be gone at the end of the test. ModifyStorage: func(_ testing.TB, _ config.Cfg, storagePath string) { - packFilePath := packFilePath(walFilesPathForLSN(filepath.Join(storagePath, setup.RelativePath), 1)) + packFilePath := packFilePath(log.EntryPath(filepath.Join(storagePath, setup.RelativePath), 1)) require.NoError(t, os.MkdirAll(filepath.Dir(packFilePath), mode.Directory)) require.NoError(t, os.WriteFile( packFilePath, @@ -1723,15 +1724,12 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t require.Equal(t, expected[i].snapshotReaders, actual.snapshotReaders) if expected[i].entry != nil { - actualEntry, err := manager.readLogEntry(actual.lsn) - require.NoError(t, err) - expectedEntry := expected[i].entry if testhelper.IsReftableEnabled() { for idx, op := range expectedEntry.GetOperations() { if chl := op.GetCreateHardLink(); chl != nil { - actualCHL := actualEntry.GetOperations()[idx].GetCreateHardLink() + actualCHL := actual.entry.GetOperations()[idx].GetCreateHardLink() require.NotNil(t, actualCHL) if filepath.Base(string(actualCHL.GetDestinationPath())) == "tables.list" { @@ -1746,7 +1744,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t } } - testhelper.ProtoEqual(t, expectedEntry, actualEntry) + testhelper.ProtoEqual(t, expectedEntry, actual.entry) } i++ } @@ -2116,20 +2114,12 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) // Transaction 2 and 3 are left-over. - testhelper.RequireDirectoryState(t, tm.stateDirectory, "", - gittest.FilesOrReftables(testhelper.DirectoryState{ - "/": {Mode: mode.Directory}, - "/wal": {Mode: mode.Directory}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-1", setup.Commits.First.OID)), - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, - "/wal/0000000000003": {Mode: mode.Directory}, - "/wal/0000000000003/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-2", setup.Commits.First.OID)), - "/wal/0000000000003/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, - }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ - 2: {{"refs/heads/branch-1": git.ReferenceUpdate{NewOID: setup.Commits.First.OID}}}, - 3: {{"refs/heads/branch-2": git.ReferenceUpdate{NewOID: setup.Commits.First.OID}}}, - }))) + testhelper.RequireDirectoryState(t, tm.logManager.StateDirectory(), "", + testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }, + ) }), StartManager{}, AssertManager{}, @@ -2140,7 +2130,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(3)) - require.Equal(t, tm.appendedLSN, storage.LSN(3)) + require.Equal(t, tm.logManager.AppendedLSN(), storage.LSN(3)) }), }, expectedState: StateAssertion{ @@ -2267,21 +2257,16 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t logEntryPath := filepath.Join(t.TempDir(), "log_entry") require.NoError(t, os.Mkdir(logEntryPath, mode.Directory)) require.NoError(t, os.WriteFile(filepath.Join(logEntryPath, "1"), []byte(setup.Commits.First.OID+"\n"), mode.File)) - require.NoError(t, tm.appendLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath)) + err := tm.appendLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath) + require.NoError(t, err) RequireDatabase(t, ctx, tm.db, DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), }) // Transaction 2 and 3 are left-over. - testhelper.RequireDirectoryState(t, tm.stateDirectory, "", testhelper.DirectoryState{ + testhelper.RequireDirectoryState(t, tm.logManager.StateDirectory(), "", testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, - "/wal/0000000000002": {Mode: mode.Directory}, - "/wal/0000000000002/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-1", setup.Commits.First.OID)), - "/wal/0000000000002/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, - "/wal/0000000000003": {Mode: mode.Directory}, - "/wal/0000000000003/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-2", setup.Commits.First.OID)), - "/wal/0000000000003/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, "/wal/0000000000004": {Mode: mode.Directory}, "/wal/0000000000004/MANIFEST": manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID)), "/wal/0000000000004/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, @@ -2296,7 +2281,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(4).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(4)) - require.Equal(t, tm.appendedLSN, storage.LSN(4)) + require.Equal(t, tm.logManager.AppendedLSN(), storage.LSN(4)) }), }, expectedState: StateAssertion{