diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 11a8713663dcac9e8f672303491b619d44f1092d..9b867f4981c5eda648536045af1017eb94f4003a 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -257,7 +257,7 @@ test: TEST_TARGET: test USE_MESON: YesPlease - TEST_TARGET: - [test-with-praefect, race-go, test-wal, test-with-praefect-wal] + [test-with-praefect, race-go, test-wal, test-raft, test-with-praefect-wal] # We also verify that things work as expected with a non-bundled Git # version matching our minimum required Git version. - TEST_TARGET: test @@ -313,7 +313,7 @@ test:nightly: parallel: matrix: - GIT_VERSION: ["master", "next"] - TEST_TARGET: [test, test-with-praefect, test-with-reftable, test-with-sha256] + TEST_TARGET: [test, test-with-praefect, test-with-reftable, test-with-sha256, test-wal, test-raft] rules: - if: '$CI_PIPELINE_SOURCE == "schedule"' allow_failure: false @@ -332,7 +332,7 @@ test:sha256: parallel: matrix: - TEST_TARGET: - [test, test-with-praefect, test-wal, test-with-praefect-wal] + [test, test-with-praefect, test-wal, test-raft, test-with-praefect-wal] TEST_WITH_SHA256: "YesPlease" test:fips: @@ -355,7 +355,7 @@ test:fips: - test "$(cat /proc/sys/crypto/fips_enabled)" = "1" || (echo "System is not running in FIPS mode" && exit 1) parallel: matrix: - - TEST_TARGET: [test, test-with-praefect] + - TEST_TARGET: [test, test-with-praefect, test-wal, test-raft] FIPS_MODE: "YesPlease" GO_VERSION: !reference [.versions, go_supported] rules: diff --git a/Makefile b/Makefile index e56a4cc21a1b7a3b9adf82d95d1b98773724893e..b30b656d411d5cb9db252d12c517ea264d5b69da 100644 --- a/Makefile +++ b/Makefile @@ -440,6 +440,12 @@ test-with-praefect: test-go test-wal: export GITALY_TEST_WAL = YesPlease test-wal: test-go +.PHONY: test-raft +## Run Go tests with write-ahead logging + Raft enabled. +test-raft: export GITALY_TEST_WAL = YesPlease +test-raft: export GITALY_TEST_RAFT = YesPlease +test-raft: test-go + .PHONY: test-with-praefect-wal ## Run Go tests with write-ahead logging and Praefect enabled. test-with-praefect-wal: export GITALY_TEST_WAL = YesPlease diff --git a/internal/backup/partition_backup_test.go b/internal/backup/partition_backup_test.go index 38ca581b11211656a73f79f703d31cf66ecaa7d5..917334df6f34eba17e877bc31ad9f6192f53b9fe 100644 --- a/internal/backup/partition_backup_test.go +++ b/internal/backup/partition_backup_test.go @@ -114,6 +114,10 @@ func TestPartitionBackup_CreateSuccess(t *testing.T) { require.NoError(t, err) + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) + for _, expectedArchive := range tc.expectedArchives { tarPath := filepath.Join(backupRoot, "partition-backups", cfg.Storages[0].Name, expectedArchive, storage.LSN(1).String()) + ".tar" tar, err := os.Open(tarPath) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 7dee9d0410916a680ab769933b57a03c20e218e2..512eb2c3f044ceda03d963b9e4f901b3f730fdbe 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -371,9 +371,10 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { storageMetrics := storagemgr.NewMetrics(cfg.Prometheus) housekeepingMetrics := housekeeping.NewMetrics(cfg.Prometheus) + raftMetrics := raftmgr.NewMetrics() partitionMetrics := partition.NewMetrics(housekeepingMetrics) migrationMetrics := migration.NewMetrics() - prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics) + prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics, raftMetrics) migrations := []migration.Migration{} @@ -409,6 +410,11 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { logConsumer = walArchiver } + var raftFactory raftmgr.RaftManagerFactory + if cfg.Raft.Enabled { + raftFactory = raftmgr.DefaultFactory(cfg.Raft) + } + nodeMgr, err := nodeimpl.NewManager( cfg.Storages, storagemgr.NewFactory( @@ -420,6 +426,8 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { localrepoFactory, partitionMetrics, logConsumer, + cfg.Raft, + raftFactory, ), migrationMetrics, migrations, @@ -469,6 +477,8 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { localrepoFactory, partitionMetrics, nil, + cfg.Raft, + nil, ), // In recovery mode we don't want to keep inactive partitions active. The cache // however can't be disabled so simply set it to one. @@ -543,13 +553,12 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("resolve backup locator: %w", err) } } - raftTransport := &raftmgr.GrpcTransport{} + + var raftTransport *raftmgr.GrpcTransport if cfg.Raft.Enabled { raftManagerRegistry := raftmgr.NewRaftManagerRegistry() routingTable := raftmgr.NewStaticRaftRoutingTable() raftTransport = raftmgr.NewGrpcTransport(logger, cfg, routingTable, raftManagerRegistry, conns) - raftSnapshotterMetrics := raftmgr.NewMetrics() - prometheus.MustRegister(raftSnapshotterMetrics) } var bundleURIManager *bundleuri.GenerationManager diff --git a/internal/cli/gitaly/subcmd_recovery.go b/internal/cli/gitaly/subcmd_recovery.go index 275543cacd44a6ea0561baff12ce82b87e06610b..887342c2fb90e10b4f5582b02e968135d4fc5b15 100644 --- a/internal/cli/gitaly/subcmd_recovery.go +++ b/internal/cli/gitaly/subcmd_recovery.go @@ -562,6 +562,8 @@ func setupRecoveryContext(ctx *cli.Context) (rc recoveryContext, returnErr error localrepo.NewFactory(logger, config.NewLocator(cfg), gitCmdFactory, catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), migration.NewMetrics(), []migration.Migration{}, diff --git a/internal/cli/gitaly/subcmd_recovery_test.go b/internal/cli/gitaly/subcmd_recovery_test.go index 24f89366c90c2ac2fdc0f723a71bc622588c28df..28d8422a133e2a387e42078ef7b6181d6750c361 100644 --- a/internal/cli/gitaly/subcmd_recovery_test.go +++ b/internal/cli/gitaly/subcmd_recovery_test.go @@ -53,6 +53,8 @@ type setupData struct { func TestRecoveryCLI_status(t *testing.T) { t.Parallel() + testhelper.SkipWithRaft(t, "Raft must not be enabled during recovery") + for _, tc := range []struct { desc string setup func(tb testing.TB, ctx context.Context, opts setupOptions) setupData @@ -342,6 +344,8 @@ Available WAL backup entries: up to LSN: %s`, localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), 1, storagemgr.NewMetrics(cfg.Prometheus), @@ -377,6 +381,8 @@ Available WAL backup entries: up to LSN: %s`, func TestRecoveryCLI_replay(t *testing.T) { t.Parallel() + testhelper.SkipWithRaft(t, "Raft must not be enabled during recovery") + for _, tc := range []struct { desc string setup func(tb testing.TB, ctx context.Context, opts setupOptions) setupData @@ -650,6 +656,8 @@ Successfully processed log entries up to LSN: %s`, localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), 1, storagemgr.NewMetrics(cfg.Prometheus), @@ -697,6 +705,8 @@ Successfully processed log entries up to LSN: %s`, localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), 1, storagemgr.NewMetrics(cfg.Prometheus), diff --git a/internal/cli/gitalybackup/partition_test.go b/internal/cli/gitalybackup/partition_test.go index 615ec856618e11a1aad436ef4481cac725fa6f67..737837dab919dbfa058c4249eb74321b6b08dbd6 100644 --- a/internal/cli/gitalybackup/partition_test.go +++ b/internal/cli/gitalybackup/partition_test.go @@ -110,6 +110,10 @@ func TestPartitionSubcommand_Create(t *testing.T) { } require.NoError(t, err) + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) + lsn := storage.LSN(1) tarPath := filepath.Join(path, "partition-backups", cfg.Storages[0].Name, "2", lsn.String()) + ".tar" tar, err := os.Open(tarPath) diff --git a/internal/git/housekeeping/manager/testhelper_test.go b/internal/git/housekeeping/manager/testhelper_test.go index f1b463bfac47bdf0e2ca734fa22e743576f1f8d0..5db1581b9d416bbd0911f0a28ca17c272827ef5f 100644 --- a/internal/git/housekeeping/manager/testhelper_test.go +++ b/internal/git/housekeeping/manager/testhelper_test.go @@ -109,6 +109,8 @@ func testWithAndWithoutTransaction(t *testing.T, ctx context.Context, desc strin localRepoFactory, partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), storagemgr.DefaultMaxInactivePartitions, storagemgr.NewMetrics(cfg.Prometheus), diff --git a/internal/git/objectpool/fetch_test.go b/internal/git/objectpool/fetch_test.go index e50efd05e5a549f11b87a3033c82c93f79106589..6ce2c8d69917629a7cec08b0cac7cc77c35d3559 100644 --- a/internal/git/objectpool/fetch_test.go +++ b/internal/git/objectpool/fetch_test.go @@ -434,6 +434,8 @@ func testWithAndWithoutTransaction(t *testing.T, ctx context.Context, testFunc f localRepoFactory, partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), storagemgr.DefaultMaxInactivePartitions, storagemgr.NewMetrics(cfg.Prometheus), diff --git a/internal/gitaly/service/partition/backup_partition_test.go b/internal/gitaly/service/partition/backup_partition_test.go index 564180929ab76c4e9855ca65b8879aef477e7b1e..6f2174229c10da66ea8cd223c07a8dca2a6d5b78 100644 --- a/internal/gitaly/service/partition/backup_partition_test.go +++ b/internal/gitaly/service/partition/backup_partition_test.go @@ -30,6 +30,9 @@ func TestBackupPartition(t *testing.T) { if testhelper.IsPraefectEnabled() { t.Skip(`Praefect currently doesn't support routing the PARTITION scoped RPC messages.`) } + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) type setupData struct { cfg config.Cfg @@ -251,6 +254,9 @@ func TestBackupPartition_BackupExists(t *testing.T) { if testhelper.IsPraefectEnabled() { t.Skip(`Praefect currently doesn't support routing the PARTITION scoped RPC messages.`) } + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) ctx := testhelper.Context(t) diff --git a/internal/gitaly/service/partition/list_partitions_test.go b/internal/gitaly/service/partition/list_partitions_test.go index e34605c2d5b59b55ed46996922a042130bfebc4d..76d0af24cfc3e80832d089378ebd0bd5a9c3dae3 100644 --- a/internal/gitaly/service/partition/list_partitions_test.go +++ b/internal/gitaly/service/partition/list_partitions_test.go @@ -19,6 +19,9 @@ func TestListPartitions(t *testing.T) { t.Skip(`Since it is not guaranteed that all the gitaly instances for a given Praefect will have the same partition IDs, this RPC will not be supported for Praefect`) } + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) ctx := testhelper.Context(t) cfg, ptnClient, repoClient := setupServices(t) diff --git a/internal/gitaly/storage/raftmgr/event_registry.go b/internal/gitaly/storage/raftmgr/event_registry.go index 468a95dc70930b2691e0574d5fbc4a14a2874082..0b93909fe9e98bd1078556556ed20af829742eff 100644 --- a/internal/gitaly/storage/raftmgr/event_registry.go +++ b/internal/gitaly/storage/raftmgr/event_registry.go @@ -1,16 +1,11 @@ package raftmgr import ( - "fmt" "sync" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" ) -// ErrObsoleted is returned when an event associated with a LSN is shadowed by another one with higher term. That event -// must be unlocked and removed from the registry. -var ErrObsoleted = fmt.Errorf("obsoleted event, shadowed by a log entry with higher term") - // EventID uniquely identifies an event in the registry. type EventID uint64 @@ -18,8 +13,7 @@ type EventID uint64 type Waiter struct { ID EventID LSN storage.LSN - C chan struct{} - Err error + C chan error } // Registry manages events and their associated waiters, enabling the registration @@ -47,7 +41,7 @@ func (r *Registry) Register() *Waiter { r.nextEventID++ waiter := &Waiter{ ID: r.nextEventID, - C: make(chan struct{}), + C: make(chan error, 1), } r.waiters[r.nextEventID] = waiter @@ -67,8 +61,9 @@ func (r *Registry) AssignLSN(id EventID, lsn storage.LSN) { waiter.LSN = lsn } -// UntrackSince untracks all events having LSNs greater than or equal to the input LSN. -func (r *Registry) UntrackSince(lsn storage.LSN) { +// UntrackSince untracks all events having LSNs greater than or equal to the input LSN. The input error is assigned to +// impacted events. +func (r *Registry) UntrackSince(lsn storage.LSN, err error) { r.mu.Lock() defer r.mu.Unlock() @@ -79,12 +74,24 @@ func (r *Registry) UntrackSince(lsn storage.LSN) { } } for _, id := range toRemove { + r.waiters[id].C <- err close(r.waiters[id].C) - r.waiters[id].Err = ErrObsoleted delete(r.waiters, id) } } +// UntrackAll untracks all events. The input error is assigned to impacted events. +func (r *Registry) UntrackAll(err error) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, w := range r.waiters { + w.C <- err + close(w.C) + } + clear(r.waiters) +} + // Untrack closes the channel associated with a given EventID and removes the waiter from the registry once the event is // committed. This function returns if the registry is still tracking the event. func (r *Registry) Untrack(id EventID) bool { diff --git a/internal/gitaly/storage/raftmgr/event_registry_test.go b/internal/gitaly/storage/raftmgr/event_registry_test.go index 8c55f65a5787951f20c11512072c086ee35665af..090e3f61269f6a119b9e9459791b57b3d305806f 100644 --- a/internal/gitaly/storage/raftmgr/event_registry_test.go +++ b/internal/gitaly/storage/raftmgr/event_registry_test.go @@ -1,6 +1,7 @@ package raftmgr import ( + "fmt" "sync" "testing" "time" @@ -54,7 +55,7 @@ func TestRegistry_Untrack(t *testing.T) { action: func(t *testing.T, r *Registry) []*Waiter { require.False(t, r.Untrack(1234), "event should not be tracked") - c := make(chan struct{}) + c := make(chan error, 1) close(c) return []*Waiter{{ID: 99999, C: c}} // Non-existent event }, @@ -113,7 +114,7 @@ func TestRegistry_UntrackSince(t *testing.T) { registry.AssignLSN(waiter3.ID, 12) // Call UntrackSince with threshold LSN - registry.UntrackSince(11) + registry.UntrackSince(11, fmt.Errorf("a random error")) // Waiters with LSN > 10 should be obsoleted select { @@ -123,17 +124,17 @@ func TestRegistry_UntrackSince(t *testing.T) { // Waiter1 should not be closed } select { - case <-waiter2.C: + case err := <-waiter2.C: // Expected behavior, channel closed - require.Equal(t, ErrObsoleted, waiter2.Err) + require.Equal(t, fmt.Errorf("a random error"), err) default: t.Fatalf("Expected channel for event %d to be closed", waiter2.ID) } select { - case <-waiter3.C: + case err := <-waiter3.C: // Expected behavior, channel closed - require.Equal(t, ErrObsoleted, waiter3.Err) + require.Equal(t, fmt.Errorf("a random error"), err) default: t.Fatalf("Expected channel for event %d to be closed", waiter3.ID) } @@ -145,6 +146,38 @@ func TestRegistry_UntrackSince(t *testing.T) { require.False(t, registry.Untrack(waiter3.ID)) } +func TestRegistry_UntrackAll(t *testing.T) { + t.Parallel() + registry := NewRegistry() + + waiter1 := registry.Register() + waiter2 := registry.Register() + waiter3 := registry.Register() + + // Assign LSNs + registry.AssignLSN(waiter1.ID, 10) + registry.AssignLSN(waiter2.ID, 11) + registry.AssignLSN(waiter3.ID, 12) + + // Call UntrackSince with threshold LSN + registry.UntrackAll(fmt.Errorf("a random error")) + + for _, w := range []*Waiter{waiter1, waiter2, waiter3} { + select { + case err := <-w.C: + // Expected behavior, channel closed + require.Equal(t, fmt.Errorf("a random error"), err) + default: + t.Fatalf("Expected channel for event %d to be closed", w.ID) + } + } + + // All waiters should not be tracked + require.False(t, registry.Untrack(waiter1.ID)) + require.False(t, registry.Untrack(waiter2.ID)) + require.False(t, registry.Untrack(waiter3.ID)) +} + func TestRegistry_ConcurrentAccess(t *testing.T) { t.Parallel() const numEvents = 100 diff --git a/internal/gitaly/storage/raftmgr/grpc_transport_test.go b/internal/gitaly/storage/raftmgr/grpc_transport_test.go index 78892881e5d2b4e0329a0129730459fddbb67126..2b2db087905348603962a64481d119d1ec4d0386 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport_test.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport_test.go @@ -145,10 +145,10 @@ func TestGrpcTransport_SendAndReceive(t *testing.T) { require.NoError(t, err) // Create test messages - msgs := createTestMessages(t, testCluster, mgr.GetLogReader(), tc.walEntries) + msgs := createTestMessages(t, testCluster, mgr, tc.walEntries) // Send Message from leader to all followers - err = leader.transport.Send(ctx, mgr.GetLogReader(), 1, storageName, msgs) + err = leader.transport.Send(ctx, mgr, 1, storageName, msgs) if tc.expectedError != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.expectedError) diff --git a/internal/gitaly/storage/raftmgr/hooks.go b/internal/gitaly/storage/raftmgr/hooks.go new file mode 100644 index 0000000000000000000000000000000000000000..ec4ba9b7a9f0f1481ac5c884298bacfb1c829200 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/hooks.go @@ -0,0 +1,38 @@ +package raftmgr + +// testHooks defines insertion points for testing various stages of Raft operations. +type testHooks struct { + // BeforeInsertLogEntry is called before inserting a log entry at the specified index. + BeforeInsertLogEntry func(index uint64) + + // BeforeSaveHardState is called before persisting a new hard state. + BeforeSaveHardState func() + + // BeforePropose is called before proposing a new log entry with the given path. + BeforePropose func(path string) + + // BeforeProcessCommittedEntries is called before processing committed entries from a Ready state. + BeforeProcessCommittedEntries func() + + // BeforeNodeAdvance is called before advancing the Raft node's state. + BeforeNodeAdvance func() + + // BeforeSendMessages is called before sending messages to other Raft nodes. + BeforeSendMessages func() + + // BeforeHandleReady is called before processing a new Ready state from the Raft node. + BeforeHandleReady func() +} + +// noopHooks returns a Hooks instance with all hooks set to no-op functions. +func noopHooks() testHooks { + return testHooks{ + BeforeInsertLogEntry: func(uint64) {}, + BeforeSaveHardState: func() {}, + BeforePropose: func(string) {}, + BeforeProcessCommittedEntries: func() {}, + BeforeNodeAdvance: func() {}, + BeforeSendMessages: func() {}, + BeforeHandleReady: func() {}, + } +} diff --git a/internal/gitaly/storage/raftmgr/manager.go b/internal/gitaly/storage/raftmgr/manager.go index 64769fdb9a3d74ef78051347a6ab1ab865708214..a3fd5becba5ffdff9a96f245ccc669458b8949ba 100644 --- a/internal/gitaly/storage/raftmgr/manager.go +++ b/internal/gitaly/storage/raftmgr/manager.go @@ -2,14 +2,763 @@ package raftmgr import ( "context" + "errors" + "fmt" + "runtime" + "sync" + "time" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" + logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/safe" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + "google.golang.org/protobuf/proto" ) // RaftManager is an interface that defines the methods to orchestrate the Raft consensus protocol. type RaftManager interface { - GetEntryPath(storage.LSN) string - GetLogReader() storage.LogReader + // Embed all LogManager methods for WAL operations + storage.LogManager + + // Initialize prepares the Raft system with the given context and last applied LSN + Initialize(ctx context.Context, appliedLSN storage.LSN) error + + // Step processes a Raft message from a remote node Step(ctx context.Context, msg raftpb.Message) error } + +var ( + // ErrObsoleted is returned when an event associated with a LSN is shadowed by another one with higher term. That event + // must be unlocked and removed from the registry. + ErrObsoleted = fmt.Errorf("event is obsolete, superseded by a recent log entry with higher term") + // ErrReadyTimeout is returned when the manager times out when waiting for Raft group to be ready. + ErrReadyTimeout = fmt.Errorf("ready timeout exceeded") + // ErrManagerStopped is returned when the main loop of Raft manager stops. + ErrManagerStopped = fmt.Errorf("raft manager stopped") +) + +const ( + // Maximum size of individual Raft messages + defaultMaxSizePerMsg = 10 * 1024 * 1024 + // Maximum number of in-flight Raft messages. This controls how many messages can be sent without acknowledgment + defaultMaxInflightMsgs = 256 +) + +// ready manages the readiness signaling of the Raft system. +type ready struct { + c chan error // Channel used to signal readiness + once sync.Once // Ensures the readiness signal is sent exactly once +} + +// set signals readiness by closing the channel exactly once. +func (r *ready) set(err error) { + r.once.Do(func() { + r.c <- err + close(r.c) + }) +} + +// ManagerOptions configures optional parameters for the Raft Manager. +type ManagerOptions struct { + // readyTimeout sets the maximum duration to wait for Raft to become ready + readyTimeout time.Duration + // opTimeout sets the maximum duration for propose, append, and commit operations + // This is primarily used in testing to detect deadlocks and performance issues + opTimeout time.Duration + // entryRecorder stores Raft log entries for testing purposes + entryRecorder *EntryRecorder +} + +// OptionFunc defines a function type for configuring ManagerOptions. +type OptionFunc func(opt ManagerOptions) ManagerOptions + +// WithReadyTimeout sets the maximum duration to wait for Raft to become ready. +// The default timeout is 5 times the election timeout. +func WithReadyTimeout(t time.Duration) OptionFunc { + return func(opt ManagerOptions) ManagerOptions { + opt.readyTimeout = t + return opt + } +} + +// WithOpTimeout sets a timeout for individual Raft operations. +// This should only be used in testing environments. +func WithOpTimeout(t time.Duration) OptionFunc { + return func(opt ManagerOptions) ManagerOptions { + opt.opTimeout = t + return opt + } +} + +// WithEntryRecorder enables recording of Raft log entries for testing. +func WithEntryRecorder(recorder *EntryRecorder) OptionFunc { + return func(opt ManagerOptions) ManagerOptions { + opt.entryRecorder = recorder + return opt + } +} + +// Manager orchestrates the Raft consensus protocol for a Gitaly partition. +// It manages configuration, state synchronization, and communication between members. +// The Manager implements the storage.LogManager interface. +type Manager struct { + mutex sync.Mutex + + ctx context.Context // Context for controlling manager's lifecycle + cancel context.CancelFunc + + authorityName string // Name of the storage this partition belongs to + ptnID storage.PartitionID // Unique identifier for the managed partition + node raft.Node // etcd/raft node representation + raftCfg config.Raft // etcd/raft configurations + options ManagerOptions // Additional manager configuration + logger logging.Logger // Internal logging + storage *Storage // Persistent storage for Raft logs and state + registry *Registry // Event tracking + leadership *Leadership // Current leadership information + syncer safe.Syncer // Synchronization operations + wg sync.WaitGroup // Goroutine lifecycle management + ready *ready // Initialization state tracking + started bool // Indicates if manager has been started + + // notifyQueue signals new changes or errors to clients + // Clients must process signals promptly to prevent blocking + notifyQueue chan error + + // EntryRecorder stores Raft log entries for testing + EntryRecorder *EntryRecorder + + // hooks is a collection of hooks, used in test environment to intercept critical events + hooks testHooks +} + +// applyOptions creates and validates manager options by applying provided option functions +// to a default configuration. +func applyOptions(raftCfg config.Raft, opts []OptionFunc) (ManagerOptions, error) { + baseRTT := time.Duration(raftCfg.RTTMilliseconds) * time.Millisecond + options := ManagerOptions{ + // Default readyTimeout is 5 times the election timeout to allow for initial self-elections + readyTimeout: 5 * time.Duration(raftCfg.ElectionTicks) * baseRTT, + } + + for _, opt := range opts { + options = opt(options) + } + + if options.readyTimeout == 0 { + return options, fmt.Errorf("readyTimeout must not be zero") + } else if options.readyTimeout < time.Duration(raftCfg.ElectionTicks)*baseRTT { + return options, fmt.Errorf("readyTimeout must not be less than election timeout") + } + + return options, nil +} + +// RaftManagerFactory defines a function type that creates a new Raft Manager instance. +type RaftManagerFactory func( + storageName string, + partitionID storage.PartitionID, + raftStorage *Storage, + logger logging.Logger, +) (*Manager, error) + +// DefaultFactory returns a RaftManagerFactory that returns a manager from input raft config +func DefaultFactory(raftCfg config.Raft) RaftManagerFactory { + return func( + storageName string, + partitionID storage.PartitionID, + raftStorage *Storage, + logger logging.Logger, + ) (*Manager, error) { + return NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + } +} + +// NewManager creates an instance of Manager. +func NewManager( + authorityName string, + partitionID storage.PartitionID, + raftCfg config.Raft, + raftStorage *Storage, + logger logging.Logger, + opts ...OptionFunc, +) (*Manager, error) { + if !raftCfg.Enabled { + return nil, fmt.Errorf("raft is not enabled") + } + + options, err := applyOptions(raftCfg, opts) + if err != nil { + return nil, fmt.Errorf("invalid raft manager option: %w", err) + } + + logger = logger.WithFields(logging.Fields{ + "component": "raft", + "raft.authority": authorityName, + "raft.partition": partitionID, + }) + + return &Manager{ + authorityName: authorityName, + ptnID: partitionID, + raftCfg: raftCfg, + options: options, + storage: raftStorage, + logger: logger, + registry: NewRegistry(), + syncer: safe.NewSyncer(), + leadership: NewLeadership(), + ready: &ready{c: make(chan error, 1)}, + notifyQueue: make(chan error, 1), + EntryRecorder: options.entryRecorder, + hooks: noopHooks(), + }, nil +} + +// Initialize starts the Raft manager by: +// - Loading or bootstrapping the Raft state +// - Initializing the etcd/raft Node +// - Starting the processing goroutine +// +// The appliedLSN parameter indicates the last log sequence number that was fully applied +// to the partition's state. This ensures that Raft processing begins from the correct point +// in the log history. +func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) error { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + if mgr.started { + return fmt.Errorf("raft manager for partition %q already started", mgr.ptnID) + } + mgr.started = true + + mgr.ctx, mgr.cancel = context.WithCancel(ctx) + + bootstrapped, err := mgr.storage.initialize(ctx, appliedLSN) + if err != nil { + return fmt.Errorf("failed to load raft initial state: %w", err) + } + + // etcd/raft uses an integer ID to identify a member of a group. This ID is incremented whenever a new member + // joins the group. This node ID system yields some benefits: + // - No need to set the node ID statically, avoiding the need for a composite key of the storage + // name and node ID. + // - No need for a global node registration system, as IDs are ephemeral. + // - Works better scenarios where a member leaves and then re-join the cluster. Each joining event leads to + // a new unique node ID. + // Currently, Gitaly only supports single-node Raft clusters, so we use a fixed node ID of 1. In a future + // implementation of multi-node clusters, each node will get a unique ID when joining the cluster. + // https://gitlab.com/gitlab-org/gitaly/-/issues/6304 tracks the work to bootstrap new cluster members. + var nodeID uint64 = 1 + + config := &raft.Config{ + ID: nodeID, + ElectionTick: int(mgr.raftCfg.ElectionTicks), + HeartbeatTick: int(mgr.raftCfg.HeartbeatTicks), + Storage: mgr.storage, + MaxSizePerMsg: defaultMaxSizePerMsg, + MaxInflightMsgs: defaultMaxInflightMsgs, + Logger: &raftLogger{logger: mgr.logger.WithFields(logging.Fields{"raft.component": "manager"})}, + // We disable automatic proposal forwarding provided by etcd/raft because it would bypass Gitaly's + // transaction validation system. In Gitaly, each transaction is verified against the latest state + // before being committed. If proposal forwarding is enabled, replica nodes would have the ability to + // start transactions independently and propose them to the leader for commit. + // + // Replica: A -> B -> C -> Start D ------------> Forward to Leader ----| + // Leader: A -> B -> C -> Start E -> Commit E ----------------------> Receive D -> Commit D + // In this scenario, D is not verified against E even though E commits before D. + // + // Instead, we'll implement explicit request routing at the RPC layer to ensure all writes go through + // proper verification on the leader. + // See https://gitlab.com/gitlab-org/gitaly/-/issues/6465 + DisableProposalForwarding: true, + } + + if !bootstrapped { + // For first-time bootstrap, initialize with self as the only peer + peers := []raft.Peer{{ID: nodeID}} + mgr.node = raft.StartNode(config, peers) + } else { + // For restarts, set Applied to latest committed LSN + // WAL considers entries committed once they are in the Raft log + config.Applied = uint64(mgr.storage.committedLSN) + mgr.node = raft.RestartNode(config) + } + + go mgr.run(bootstrapped) + + select { + case <-time.After(mgr.options.readyTimeout): + return ErrReadyTimeout + case err := <-mgr.ready.c: + return err + } +} + +// run executes the main Raft event loop, processing ticks, ready states, and notifications. +func (mgr *Manager) run(bootstrapped bool) { + mgr.wg.Add(1) + defer mgr.wg.Done() + + ticker := time.NewTicker(time.Duration(mgr.raftCfg.RTTMilliseconds) * time.Millisecond) + defer ticker.Stop() + + // For bootstrapped clusters, mark ready immediately since state is already established + // For new clusters, wait for first config change + if bootstrapped { + mgr.signalReady() + } + + // Main event processing loop + for { + select { + case <-ticker.C: + // Drive the etcd/raft internal clock + // Election and timeout depend on tick count + mgr.node.Tick() + case rd, ok := <-mgr.node.Ready(): + if err := mgr.safeExec(func() error { + if !ok { + return fmt.Errorf("raft node Ready channel unexpectedly closed") + } + if err := mgr.handleReady(&rd); err != nil { + return err + } + mgr.hooks.BeforeNodeAdvance() + mgr.node.Advance() + return nil + }); err != nil { + mgr.handleFatalError(err) + return + } + case err := <-mgr.storage.localLog.GetNotificationQueue(): + // Forward storage notifications + if err == nil { + select { + case mgr.notifyQueue <- nil: + default: + // Non-critical if we can't send a nil notification + } + } else { + mgr.handleFatalError(err) + return + } + + case <-mgr.ctx.Done(): + err := mgr.ctx.Err() + if !errors.Is(err, context.Canceled) { + mgr.handleFatalError(err) + } + return + } + } +} + +// safeExec executes a function and recovers from panics, converting them to errors +func (mgr *Manager) safeExec(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + switch v := r.(type) { + case error: + err = fmt.Errorf("panic recovered: %w", v) + default: + err = fmt.Errorf("panic recovered: %v", r) + } + // Capture stack trace for debugging + stack := make([]byte, 4096) + stack = stack[:runtime.Stack(stack, false)] + + err := fmt.Errorf("raft manager panic: %v", r) + mgr.logger.WithError(err).WithField("error.stack", string(stack)).Error("raft manager panic recovered") + } + }() + + return fn() +} + +// handleFatalError handles a fatal error that requires the run loop to terminate +func (mgr *Manager) handleFatalError(err error) { + // Set back to ready to unlock the caller of Initialize(). + mgr.signalError(ErrManagerStopped) + + mgr.logger.WithError(err).Error("raft event loop failed") + + // Unlock all waiters of AppendLogEntry about the manager being stopped. + mgr.registry.UntrackAll(ErrManagerStopped) + + // Ensure error is sent to notification queue. + mgr.notifyQueue <- err +} + +// Close gracefully shuts down the Raft manager and its components. +func (mgr *Manager) Close() error { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + if !mgr.started { + return nil + } + + mgr.node.Stop() + mgr.cancel() + mgr.wg.Wait() + + return mgr.storage.close() +} + +// GetNotificationQueue returns the channel used to notify external components of changes. +func (mgr *Manager) GetNotificationQueue() <-chan error { + return mgr.notifyQueue +} + +// GetEntryPath returns the filesystem path for a given log entry. +func (mgr *Manager) GetEntryPath(lsn storage.LSN) string { + return mgr.storage.localLog.GetEntryPath(lsn) +} + +// AcknowledgePosition marks log entries up to and including the given LSN +// as successfully processed for the specified position type. Raft manager +// doesn't handle this directly. It propagates to the local log manager. +func (mgr *Manager) AcknowledgePosition(t storage.PositionType, lsn storage.LSN) error { + return mgr.storage.localLog.AcknowledgePosition(t, lsn) +} + +// AppendedLSN returns the LSN of the most recently appended log entry. +func (mgr *Manager) AppendedLSN() storage.LSN { + return mgr.storage.committedLSN +} + +// LowWaterMark returns the earliest LSN that should be retained. +// Log entries before this LSN can be safely removed. +func (mgr *Manager) LowWaterMark() storage.LSN { + lsn, _ := mgr.storage.FirstIndex() + return storage.LSN(lsn) +} + +// AppendLogEntry proposes a new log entry to the cluster. +// It blocks until the entry is committed, timeout occurs, or the cluster rejects it. +func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { + mgr.wg.Add(1) + defer mgr.wg.Done() + + w := mgr.registry.Register() + defer mgr.registry.Untrack(w.ID) + + message := &gitalypb.RaftEntry{ + Id: uint64(w.ID), + Data: &gitalypb.RaftEntry_LogData{ + LocalPath: []byte(logEntryPath), + }, + } + data, err := proto.Marshal(message) + if err != nil { + return 0, fmt.Errorf("marshaling Raft message: %w", err) + } + + ctx := mgr.ctx + + // Set an optional timeout to prevent proposal processing takes forever. This option is + // more useful in testing environments. + if mgr.options.opTimeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(mgr.ctx, mgr.options.opTimeout) + defer cancel() + } + + mgr.hooks.BeforePropose(logEntryPath) + if err := mgr.node.Propose(ctx, data); err != nil { + return 0, fmt.Errorf("proposing Raft message: %w", err) + } + + select { + case <-ctx.Done(): + return 0, ctx.Err() + case err := <-w.C: + return w.LSN, err + } +} + +// CompareAndAppendLogEntry is a variant of AppendLogEntry. It appends the log entry to the write-ahead log if and only +// if the inserting position matches the expected LSN. Raft manager doesn't implement this method. The LSN is allocated +// by the underlying Raft engine. It cannot guarantee the inserted LSN matches before actual insertion. +func (mgr *Manager) CompareAndAppendLogEntry(lsn storage.LSN, logEntryPath string) (storage.LSN, error) { + return 0, fmt.Errorf("raft manager does not support CompareAndAppendLogEntry") +} + +// DeleteLogEntry deletes the log entry at the given LSN from the log. Raft manager doesn't support log entry deletion. +// After an entry is persisted, it is then sent to other members in the raft group for acknowledgement. There is no good +// way to withdraw this submission. +func (mgr *Manager) DeleteLogEntry(lsn storage.LSN) error { + return fmt.Errorf("raft manager does not support DeleteLogEntry") +} + +// NotifyNewEntries signals to the notification queue that a newly written log entry is available for consumption. +func (mgr *Manager) NotifyNewEntries() { + mgr.notifyQueue <- nil +} + +// handleReady processes the next state signaled by etcd/raft through three main steps: +// 1. Persist states (SoftState, HardState, and uncommitted Entries) +// 2. Send messages to other members via Transport +// 3. Process committed entries (entries acknowledged by the majority) +// See: https://pkg.go.dev/go.etcd.io/etcd/raft/v3#section-readme +func (mgr *Manager) handleReady(rd *raft.Ready) error { + mgr.hooks.BeforeHandleReady() + + // Handle volatile state updates for leadership tracking and observability + if err := mgr.handleSoftState(rd); err != nil { + return fmt.Errorf("handling soft state: %w", err) + } + + // In https://pkg.go.dev/go.etcd.io/etcd/raft/v3#section-readme, the + // library recommends saving entries, hard state, and snapshots + // atomically. We can't currently do this because we use different + // backends to persist these items: + // - entries are appended to WAL + // - hard state is persisted to the KV DB + // - snapshots are written to a directory + + // Persist new log entries to disk. These entries are not yet committed + // and may be superseded by entries with the same LSN but higher term. + // WAL will clean up any overlapping entries. + if err := mgr.saveEntries(rd); err != nil { + return fmt.Errorf("saving entries: %w", err) + } + + // Persist essential state needed for crash recovery + if err := mgr.handleHardState(rd); err != nil { + return fmt.Errorf("handling hard state: %w", err) + } + + // Send messages to other members in the cluster. + // Note: While the Raft thesis (section 10.2) suggests pipelining this step + // for parallel processing after disk persistence, our WAL currently serializes + // transactions. This optimization may become relevant when WAL supports + // concurrent transaction processing. + // Reference: https://github.com/ongardie/dissertation/blob/master/stanford.pdf + // + // The current implementation does not include Raft snapshotting or log compaction. + // This means the log will grow indefinitely until manually truncated. + // + // In a future implementation, periodic snapshots will allow the log to be trimmed + // by removing entries that have been incorporated into a snapshot. + // See https://gitlab.com/gitlab-org/gitaly/-/issues/6463 + if err := mgr.sendMessages(rd); err != nil { + return fmt.Errorf("sending messages: %w", err) + } + + // Process committed entries in WAL. In single-node clusters, entries will be + // committed immediately without network communication since there's no need for + // consensus with other members. + if err := mgr.processCommitEntries(rd); err != nil { + return fmt.Errorf("processing committed entries: %w", err) + } + return nil +} + +// saveEntries persists new log entries to storage and handles their recording if enabled. +func (mgr *Manager) saveEntries(rd *raft.Ready) error { + if len(rd.Entries) == 0 { + return nil + } + + // Remove in-flight events with duplicate LSNs but lower terms + // WAL will clean up corresponding entries on disk + // Events without LSNs are preserved as they haven't reached this stage + firstLSN := storage.LSN(rd.Entries[0].Index) + mgr.registry.UntrackSince(firstLSN, ErrObsoleted) + + for i := range rd.Entries { + lsn := storage.LSN(rd.Entries[i].Index) + + switch rd.Entries[i].Type { + case raftpb.EntryNormal: + if len(rd.Entries[i].Data) == 0 { + // Handle empty entries (typically internal Raft entries) + if err := mgr.storage.draftLogEntry(rd.Entries[i], func(w *wal.Entry) error { + return nil + }); err != nil { + return fmt.Errorf("inserting config change log entry: %w", err) + } + if err := mgr.recordEntryIfNeeded(true, lsn); err != nil { + return fmt.Errorf("recording log entry: %w", err) + } + } else { + // Handle normal entries containing RaftMessage data + var message gitalypb.RaftEntry + if err := proto.Unmarshal(rd.Entries[i].Data, &message); err != nil { + return fmt.Errorf("unmarshalling entry type: %w", err) + } + + logEntryPath := string(message.GetData().GetLocalPath()) + if err := mgr.storage.insertLogEntry(rd.Entries[i], logEntryPath); err != nil { + return fmt.Errorf("appending log entry: %w", err) + } + if err := mgr.recordEntryIfNeeded(false, lsn); err != nil { + return fmt.Errorf("recording log entry: %w", err) + } + + mgr.registry.AssignLSN(EventID(message.GetId()), lsn) + } + case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: + // Handle configuration change entries + if err := mgr.storage.draftLogEntry(rd.Entries[i], func(w *wal.Entry) error { + marshaledValue, err := proto.Marshal(lsn.ToProto()) + if err != nil { + return fmt.Errorf("marshal value: %w", err) + } + w.SetKey(KeyLastConfigChange, marshaledValue) + return nil + }); err != nil { + return fmt.Errorf("inserting config change log entry: %w", err) + } + if err := mgr.recordEntryIfNeeded(true, lsn); err != nil { + return fmt.Errorf("recording log entry: %w", err) + } + default: + return fmt.Errorf("raft entry type not supported: %s", rd.Entries[i].Type) + } + } + return nil +} + +// processCommitEntries processes entries that have been committed by the Raft consensus +// and updates the system state accordingly. +func (mgr *Manager) processCommitEntries(rd *raft.Ready) error { + mgr.hooks.BeforeProcessCommittedEntries() + + for i := range rd.CommittedEntries { + var shouldNotify bool + + switch rd.CommittedEntries[i].Type { + case raftpb.EntryNormal: + var message gitalypb.RaftEntry + if err := proto.Unmarshal(rd.CommittedEntries[i].Data, &message); err != nil { + return fmt.Errorf("unmarshalling entry type: %w", err) + } + + // Notification logic: + // 1. For internal entries (those NOT tracked in the registry), we notify because + // the caller isn't aware of these automatically generated entries + // 2. For caller-issued entries (those tracked in the registry), we don't notify + // since the caller already knows about these entries + // The Untrack() method returns true for tracked entries and the unlocks waiting channel in + // AppendLogEntry(). Callers must handle concurrent modifications appropriately + shouldNotify = !mgr.registry.Untrack(EventID(message.GetId())) + + case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: + if err := mgr.processConfChange(rd.CommittedEntries[i]); err != nil { + return fmt.Errorf("processing config change: %w", err) + } + shouldNotify = true + + default: + return fmt.Errorf("raft entry type not supported: %s", rd.CommittedEntries[i].Type) + } + + if shouldNotify { + select { + case mgr.notifyQueue <- nil: + default: + } + } + } + return nil +} + +// processConfChange processes committed config change change entries, +func (mgr *Manager) processConfChange(entry raftpb.Entry) error { + var cc raftpb.ConfChangeI + if entry.Type == raftpb.EntryConfChange { + var cc1 raftpb.ConfChange + if err := cc1.Unmarshal(entry.Data); err != nil { + return fmt.Errorf("unmarshalling EntryConfChange: %w", err) + } + cc = cc1 + } else { + var cc2 raftpb.ConfChangeV2 + if err := cc2.Unmarshal(entry.Data); err != nil { + return fmt.Errorf("unmarshalling EntryConfChangeV2: %w", err) + } + cc = cc2 + } + + confState := mgr.node.ApplyConfChange(cc) + if err := mgr.storage.saveConfState(*confState); err != nil { + return fmt.Errorf("saving config state: %w", err) + } + + // Signal readiness after first config change. Applies only to new clusters that have not been bootstrapped. Not + // needed for subsequent restarts + mgr.signalReady() + return nil +} + +// sendMessages delivers pending Raft messages to other members via the transport layer. +func (mgr *Manager) sendMessages(rd *raft.Ready) error { + mgr.hooks.BeforeSendMessages() + if len(rd.Messages) > 0 { + // This code path will be properly implemented when network communication is added + // See https://gitlab.com/gitlab-org/gitaly/-/issues/6304 + return fmt.Errorf("networking for raft cluster is not implemented yet") + } + return nil +} + +// handleSoftState processes changes to volatile state like leadership and logs significant changes. +func (mgr *Manager) handleSoftState(rd *raft.Ready) error { + state := rd.SoftState + if state == nil { + return nil + } + prevLeader := mgr.leadership.GetLeaderID() + changed, duration := mgr.leadership.SetLeader(state.Lead, state.RaftState == raft.StateLeader) + + if changed { + mgr.logger.WithFields(logging.Fields{ + "raft.leader_id": mgr.leadership.GetLeaderID(), + "raft.is_leader": mgr.leadership.IsLeader(), + "raft.previous_leader_id": prevLeader, + "raft.leadership_duration": duration, + }).Info("leadership updated") + } + return nil +} + +// handleHardState persists critical Raft state required for crash recovery. +func (mgr *Manager) handleHardState(rd *raft.Ready) error { + if raft.IsEmptyHardState(rd.HardState) { + return nil + } + if err := mgr.storage.saveHardState(rd.HardState); err != nil { + return fmt.Errorf("saving hard state: %w", err) + } + return nil +} + +// recordEntryIfNeeded records log entries when entry recording is enabled, +// typically used for testing and debugging. +func (mgr *Manager) recordEntryIfNeeded(fromRaft bool, lsn storage.LSN) error { + if mgr.EntryRecorder != nil { + logEntry, err := mgr.storage.readLogEntry(lsn) + if err != nil { + return fmt.Errorf("reading log entry: %w", err) + } + mgr.EntryRecorder.Record(fromRaft, lsn, logEntry) + } + return nil +} + +func (mgr *Manager) signalReady() { + mgr.ready.set(nil) +} + +func (mgr *Manager) signalError(err error) { + mgr.ready.set(err) +} + +var _ = (storage.LogManager)(&Manager{}) diff --git a/internal/gitaly/storage/raftmgr/manager_test.go b/internal/gitaly/storage/raftmgr/manager_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cb9658e5a00cf5ff67d80d51cc9c03899499da1d --- /dev/null +++ b/internal/gitaly/storage/raftmgr/manager_test.go @@ -0,0 +1,1197 @@ +package raftmgr + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +func raftConfigsForTest(t *testing.T) config.Raft { + // Speed up initial election overhead in the test setup + return config.Raft{ + Enabled: true, + ClusterID: "test-cluster", + ElectionTicks: 5, + HeartbeatTicks: 2, + RTTMilliseconds: 100, + SnapshotDir: testhelper.TempDir(t), + } +} + +func TestManager_Initialize(t *testing.T) { + t.Parallel() + + t.Run("succeeds when raft is enabled", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + // Verify that the manager is properly initialized + require.True(t, mgr.started) + require.NotNil(t, mgr.node) + + // Verify that the first config change is recorded + // After initialization, Raft typically creates a config change + // entry to establish the initial configuration + require.Eventually(t, func() bool { + return recorder.Len() > 0 + }, 5*time.Second, 10*time.Millisecond, "expected at least one entry to be recorded") + + // Verify at least one entry from Raft was recorded (typically a config change) + raftEntries := recorder.FromRaft() + require.NotEmpty(t, raftEntries, "expected at least one Raft entry after initialization") + + // Close the manager + require.NoError(t, mgr.Close()) + }) + + t.Run("fails when raft is not enabled", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + raftCfg.Enabled = false + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + // Configure manager with Raft disabled + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.Nil(t, mgr) + require.ErrorContains(t, err, "raft is not enabled") + }) + + t.Run("fails when manager is reused", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.NoError(t, err) + + // First initialization should succeed + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + // Second initialization should fail + err = mgr.Initialize(ctx, 0) + require.EqualError(t, err, fmt.Sprintf("raft manager for partition %q already started", partitionID)) + + require.NoError(t, mgr.Close()) + }) + + t.Run("fail waiting for raft group to be ready", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + releaseReady := make(chan struct{}) + mgr.hooks.BeforeHandleReady = func() { + <-releaseReady + } + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.ErrorIs(t, err, ErrReadyTimeout) + + close(releaseReady) + require.NoError(t, mgr.Close()) + }) +} + +func TestManager_AppendLogEntry(t *testing.T) { + t.Parallel() + + setup := func(t *testing.T, ctx context.Context, cfg config.Cfg) (*Manager, *EntryRecorder) { + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + return mgr, recorder + } + + createLogEntry := func(t *testing.T, ctx context.Context, content string) string { + entryDir := testhelper.TempDir(t) + entry := wal.NewEntry(entryDir) + entry.SetKey([]byte("test-key"), []byte(content)) + + // Create a few files in the directory to simulate actual log entry data + for i := 1; i <= 3; i++ { + filePath := filepath.Join(entryDir, fmt.Sprintf("file-%d", i)) + require.NoError(t, os.WriteFile(filePath, []byte(content), 0o644)) + } + + // Write the manifest + require.NoError(t, wal.WriteManifest(ctx, entryDir, &gitalypb.LogEntry{ + Operations: entry.Operations(), + })) + + return entryDir + } + + t.Run("append single log entry", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + mgr, recorder := setup(t, ctx, cfg) + defer func() { + require.NoError(t, mgr.Close()) + }() + + logEntryPath := createLogEntry(t, ctx, "test-content-1") + lsn, err := mgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + require.Greater(t, lsn, storage.LSN(0), "expected a valid LSN") + + // Verify that the log entry was recorded + require.Eventually(t, func() bool { + // The entry might be recorded with an offset due to Raft internal entries + return recorder.Len() >= 3 + }, 5*time.Second, 10*time.Millisecond, "expected log entry to be recorded") + + // Check that our entry is not marked as coming from Raft + require.False(t, recorder.IsFromRaft(lsn), "expected user-submitted entry not to be from Raft") + + // Verify entry content + logEntry, err := wal.ReadManifest(mgr.GetEntryPath(lsn)) + require.NoError(t, err) + require.NotNil(t, logEntry) + require.Len(t, logEntry.GetOperations(), 1) + require.Equal(t, []byte("test-key"), logEntry.GetOperations()[0].GetSetKey().GetKey()) + require.Equal(t, []byte("test-content-1"), logEntry.GetOperations()[0].GetSetKey().GetValue()) + }) + + t.Run("append multiple log entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + mgr, recorder := setup(t, ctx, cfg) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Create and append multiple log entries + var lsns []storage.LSN + + for i := 1; i <= 3; i++ { + logEntryPath := createLogEntry(t, ctx, fmt.Sprintf("test-content-%d", i)) + lsn, err := mgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + lsns = append(lsns, lsn) + } + + // Verify that all log entries were recorded + require.Eventually(t, func() bool { + return recorder.Len() >= 3 + }, 5*time.Second, 10*time.Millisecond, "expected all log entries to be recorded") + + // Verify entries are in order + require.IsIncreasing(t, lsns, "expected increasing LSNs") + + for i := 0; i < 3; i++ { + logEntry, err := wal.ReadManifest(mgr.GetEntryPath(lsns[i])) + require.NoError(t, err) + require.NotNil(t, logEntry) + require.Len(t, logEntry.GetOperations(), 1) + require.Equal(t, []byte("test-key"), logEntry.GetOperations()[0].GetSetKey().GetKey()) + require.Equal(t, []byte(fmt.Sprintf("test-content-%d", i+1)), logEntry.GetOperations()[0].GetSetKey().GetValue()) + } + }) + + t.Run("append multiple log entries concurrently", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + mgr, _ := setup(t, ctx, cfg) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Number of concurrent entries to append. We use a gate so that all log entries are appended at the + // same time. We need to verify if the event loop works correctly when handling multiple entries in the + // same batch. However, the order is non-deterministic. There's also no guarantee (although very likely) + // a batch contains more than one entry. + numEntries := 20 + var wg sync.WaitGroup + wg.Add(numEntries) + + // Create a starting gate to coordinate concurrent execution + startGate := make(chan struct{}) + + // Store results + results := make(chan struct { + lsn storage.LSN + err error + idx int + }, numEntries) + + // Launch goroutines to append entries concurrently + for i := 1; i <= numEntries; i++ { + go func(idx int) { + defer wg.Done() + + // Wait for the starting gate + <-startGate + + // Create and append log entry + logEntryPath := createLogEntry(t, ctx, fmt.Sprintf("test-content-%d", idx)) + lsn, err := mgr.AppendLogEntry(logEntryPath) + + // Send the result back + results <- struct { + lsn storage.LSN + err error + idx int + }{lsn, err, idx} + }(i) + } + + // Start all goroutines at once + close(startGate) + + // Collect all results + var lsns []storage.LSN + lsnMap := make(map[int]storage.LSN) // Maps index to LSN for content verification + + wg.Wait() + close(results) + + for res := range results { + require.NoError(t, res.err, "AppendLogEntry should not fail for entry %d", res.idx) + require.Greater(t, res.lsn, storage.LSN(0), "should return a valid LSN for entry %d", res.idx) + lsns = append(lsns, res.lsn) + lsnMap[res.idx] = res.lsn + } + + // Verify entries are ordered when sorted + sortedLSNs := make([]storage.LSN, len(lsns)) + copy(sortedLSNs, lsns) + sort.Slice(sortedLSNs, func(i, j int) bool { + return sortedLSNs[i] < sortedLSNs[j] + }) + + require.IsIncreasing(t, sortedLSNs, "LSNs should be unique and increasing") + + // Verify each entry's content matches its index + for idx, lsn := range lsnMap { + logEntry, err := wal.ReadManifest(mgr.GetEntryPath(lsn)) + require.NoError(t, err) + require.NotNil(t, logEntry) + require.Len(t, logEntry.GetOperations(), 1) + require.Equal(t, []byte("test-key"), logEntry.GetOperations()[0].GetSetKey().GetKey()) + require.Equal(t, []byte(fmt.Sprintf("test-content-%d", idx)), logEntry.GetOperations()[0].GetSetKey().GetValue()) + } + }) + + t.Run("operation timeout", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + // Create manager with very short operation timeout + mgr, err := NewManager( + storageName, + partitionID, + raftCfg, + raftStorage, + logger, + WithEntryRecorder(recorder), + WithOpTimeout(1*time.Nanosecond), // Set a very short timeout to force failure + ) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Attempting to append should time out + logEntryPath := createLogEntry(t, ctx, "timeout-test") + _, err = mgr.AppendLogEntry(logEntryPath) + require.Error(t, err) + require.Contains(t, err.Error(), "context deadline exceeded") + }) + + t.Run("context canceled", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + cancelCtx, cancel := context.WithCancel(testhelper.Context(t)) + mgr, _ := setup(t, cancelCtx, cfg) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Cancel the context before append + cancel() + + // Attempt to append should fail with context canceled + logEntryPath := createLogEntry(t, ctx, "cancel-test") + _, err := mgr.AppendLogEntry(logEntryPath) + require.Error(t, err) + require.Contains(t, err.Error(), "context canceled") + }) +} + +func TestManager_AppendLogEntry_CrashRecovery(t *testing.T) { + t.Parallel() + + // testEnv encapsulates the test environment for raft manager crash tests + type testEnv struct { + mgr *Manager + db keyvalue.Transactioner + dbMgr *databasemgr.DBManager + stagingDir string + stateDir string + cfg config.Cfg + recorder *EntryRecorder + baseLSN storage.LSN + storageName string + partitionID storage.PartitionID + } + + // Helper to create a test log entry + createTestLogEntry := func(t *testing.T, ctx context.Context, content string) string { + t.Helper() + + entryDir := testhelper.TempDir(t) + entry := wal.NewEntry(entryDir) + entry.SetKey([]byte("test-key"), []byte(content)) + + // Write the manifest + require.NoError(t, wal.WriteManifest(ctx, entryDir, &gitalypb.LogEntry{ + Operations: entry.Operations(), + })) + + return entryDir + } + + // Helper for setting up a test environment + setupTest := func(t *testing.T, ctx context.Context, partitionID storage.PartitionID, setupFuncs ...func(*Manager)) testEnv { + t.Helper() + + cfg := testcfg.Build(t) + raftCfg := raftConfigsForTest(t) + logger := testhelper.NewLogger(t) + + storageName := cfg.Storages[0].Name + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + recorder := NewEntryRecorder() + + dbMgr := openTestDB(t, ctx, cfg, logger) + t.Cleanup(dbMgr.Close) + + db, err := dbMgr.GetDB(cfg.Storages[0].Name) + require.NoError(t, err) + + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), NewMetrics()) + require.NoError(t, err) + + // Configure manager + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + for _, f := range setupFuncs { + f(mgr) + } + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + // Create a log entry and append it to establish a baseline + logEntryPath := createTestLogEntry(t, ctx, "base-content") + lsn, err := mgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + require.Greater(t, lsn, storage.LSN(0)) + + return testEnv{ + mgr: mgr, + db: db, + dbMgr: dbMgr, + stagingDir: stagingDir, + stateDir: stateDir, + cfg: cfg, + recorder: recorder, + baseLSN: lsn, + storageName: storageName, + partitionID: partitionID, + } + } + + // Helper to create a recovery manager -- a new instance of the Raft Manager that picks resumes from where the + // crashed manager left off. + createRecoveryManager := func(t *testing.T, ctx context.Context, env testEnv, lastLSN storage.LSN) *Manager { + t.Helper() + + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + // Get a new DB connection from the existing DB manager + dbMgr := env.dbMgr + db, err := dbMgr.GetDB(env.cfg.Storages[0].Name) + require.NoError(t, err) + + raftStorage, err := NewStorage(raftCfg, logger, env.storageName, env.partitionID, db, env.stagingDir, env.stateDir, &mockConsumer{}, log.NewPositionTracker(), NewMetrics()) + require.NoError(t, err) + + recoveryMgr, err := NewManager(env.storageName, env.partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(env.recorder)) + require.NoError(t, err) + + // Initialize with the last known LSN + err = recoveryMgr.Initialize(ctx, lastLSN) + require.NoError(t, err) + + return recoveryMgr + } + + // Helper to verify recovery when change is NOT expected to be persisted or retained + verifyNonPersistingRecovery := func(t *testing.T, ctx context.Context, recoveryMgr *Manager, baseLSN storage.LSN, crashContent string) { + t.Helper() + + // First append a new entry - this is crucial because it may trigger overwriting + // of any uncommitted entries that might exist from before the crash + logEntryPath := createTestLogEntry(t, ctx, "recovery-content") + newLSN, err := recoveryMgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + + // Get the recorder which contains all recorded entries + recorder := recoveryMgr.EntryRecorder + require.NotNil(t, recorder, "EntryRecorder must be configured for verification") + + // Check all entries in the recorder after the base LSN + // We should NOT find our crash content in any user (non-Raft) entries + crashEntryFound := false + + // Examine all entries from baseLSN+1 to newLSN + for lsn := baseLSN + 1; lsn <= newLSN; lsn++ { + // Skip Raft internal entries - we only care about user entries + if recorder.IsFromRaft(lsn) { + continue + } + + // For user entries, check if the content matches our crash content + entry, err := wal.ReadManifest(recoveryMgr.GetEntryPath(lsn)) + if err != nil { + continue // Skip entries that can't be read + } + + // Check if this entry contains our crash content + for _, op := range entry.GetOperations() { + if op.GetSetKey() != nil && + string(op.GetSetKey().GetKey()) == "test-key" && + string(op.GetSetKey().GetValue()) == crashContent { + crashEntryFound = true + break + } + } + + if crashEntryFound { + break + } + } + + // Our crash content should NOT be persisted anywhere after recovery and new append + require.False(t, crashEntryFound, + "entry with crash content '%s' should not exist after recovery and a new append", crashContent) + + // Verify our recovery entry has the expected content + newEntry, err := wal.ReadManifest(recoveryMgr.GetEntryPath(newLSN)) + require.NoError(t, err) + + // Check for the specific recovery content + recoveryContentFound := false + for _, op := range newEntry.GetOperations() { + if op.GetSetKey() != nil && + string(op.GetSetKey().GetKey()) == "test-key" && + string(op.GetSetKey().GetValue()) == "recovery-content" { + recoveryContentFound = true + break + } + } + require.True(t, recoveryContentFound, "recovery content should be found in new entry") + } + + // Helper to verify recovery when change IS expected to be persisted and retained + verifyPersistingRecovery := func(t *testing.T, ctx context.Context, recoveryMgr *Manager, baseLSN storage.LSN, crashContent string) { + t.Helper() + + // First append a new entry - this will tell us if the crash entry is truly committed + // If the crash entry wasn't actually committed, it would be overwritten now + logEntryPath := createTestLogEntry(t, ctx, "recovery-content") + newLSN, err := recoveryMgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + + // Get the recorder which contains all recorded entries + recorder := recoveryMgr.EntryRecorder + require.NotNil(t, recorder, "EntryRecorder must be configured for verification") + + // Now check all user entries to see if our crash content survived the append of a new entry + crashEntryLSN := storage.LSN(0) + + // Examine all entries from baseLSN+1 to newLSN + for lsn := baseLSN + 1; lsn < newLSN; lsn++ { + // Skip Raft internal entries - we only care about user entries + if recorder.IsFromRaft(lsn) { + continue + } + + // For user entries, check if the content matches our crash content + entry, err := wal.ReadManifest(recoveryMgr.GetEntryPath(lsn)) + if err != nil { + continue // Skip entries that can't be read + } + + // Check if this entry contains our crash content + for _, op := range entry.GetOperations() { + if op.GetSetKey() != nil && + string(op.GetSetKey().GetKey()) == "test-key" && + string(op.GetSetKey().GetValue()) == crashContent { + crashEntryLSN = lsn + break + } + } + + if crashEntryLSN != 0 { + break + } + } + + // Our crash content should still be persisted somewhere even after a new append + require.NotEqual(t, storage.LSN(0), crashEntryLSN, + "committed entry with crash content '%s' should exist even after a new append", crashContent) + + // Verify our recovery entry is also there and has the expected content + newEntry, err := wal.ReadManifest(recoveryMgr.GetEntryPath(newLSN)) + require.NoError(t, err) + require.False(t, recorder.IsFromRaft(newLSN), "recovery entry should not be from Raft") + + // Check for the specific recovery content + recoveryContentFound := false + for _, op := range newEntry.GetOperations() { + if op.GetSetKey() != nil && + string(op.GetSetKey().GetKey()) == "test-key" && + string(op.GetSetKey().GetValue()) == "recovery-content" { + recoveryContentFound = true + break + } + } + require.True(t, recoveryContentFound, "recovery content should be found in new entry") + } + + // Register a cleanup function to close the DB manager at the end of each test + t.Cleanup(func() { + // The individual test cases will close their own managers and DB connections, + // but we need to ensure any shared DB managers are also closed at the end + }) + + t.Run("AppendLogEntry crash during propose", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + env := setupTest(t, ctx, storage.PartitionID(1)) + + // Set up hook to panic during propose + env.mgr.hooks.BeforePropose = func(logEntryPath string) { + panic("simulated crash during propose") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-propose" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + // Try to append - should panic + require.PanicsWithValue(t, "simulated crash during propose", func() { + _, _ = env.mgr.AppendLogEntry(logEntryPath) + }) + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change should NOT be persisted + verifyNonPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during commit entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(2)) + + // Set up hook to panic during commit entries + env.mgr.hooks.BeforeProcessCommittedEntries = func() { + panic("simulated crash during commit entries") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-commit" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + // Try to append - should fail with ErrManagerStopped + _, err := env.mgr.AppendLogEntry(logEntryPath) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during commit entries") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change SHOULD be persisted + // Even though client received an error, the entry was persisted before the crash + verifyPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during node advance", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + // The trigger prevents node advance from being triggered during first election. The hook must be + // inserted before Initialize(). Otherwise, we'll have a race. + var trigger atomic.Bool + env := setupTest(t, ctx, storage.PartitionID(3), func(mgr *Manager) { + mgr.hooks.BeforeNodeAdvance = func() { + if trigger.Load() { + panic("simulated crash during node advance") + } + } + }) + + // Create a test entry that will trigger the panic + crashContent := "crash-during-node-advance" + trigger.Store(true) + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + // Try to append - should return nil error since entry is committed before crash + lsn, err := env.mgr.AppendLogEntry(logEntryPath) + // At this point, the log entry is committed. Callers should receive the result + require.NoError(t, err, "client should receive success before the crash") + require.Greater(t, lsn, env.baseLSN, "should return valid LSN before crash") + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during node advance") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change SHOULD be persisted and client already got success + verifyPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during handle ready", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(4)) + + // Set up hook to panic during handle ready + env.mgr.hooks.BeforeHandleReady = func() { + panic("simulated crash during handle ready") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-handle-ready" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + _, err := env.mgr.AppendLogEntry(logEntryPath) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during handle ready") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change should NOT be persisted + // In a single-node setup, this behaves like a propose crash (change not persisted) + // In a multi-node setup, this could behave differently as the entry might already + // be replicated to other nodes before the crash + verifyNonPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during insert log entry", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(5)) + + // Set up hook to panic during insert log entry + env.mgr.storage.hooks.BeforeInsertLogEntry = func(index uint64) { + panic("simulated crash during insert log entry") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-insert" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + _, err := env.mgr.AppendLogEntry(logEntryPath) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during insert log entry") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change should NOT be persisted + // In a single-node setup, this behaves like a propose crash (change not persisted) + // In a multi-node setup, this could behave differently as the entry might already + // be replicated to other nodes before the crash + verifyNonPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during save hard state", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(6)) + + // Set up hook to panic during save hard state + env.mgr.storage.hooks.BeforeSaveHardState = func() { + panic("simulated crash during save hard state") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-save-hard-state" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + // In a single-node setup, this behaves like a propose crash (change not persisted) + // In a multi-node setup, this could behave differently as the entry might already + // be replicated to other nodes before the crash + _, err := env.mgr.AppendLogEntry(logEntryPath) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during save hard state") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // When a crash occurs during saveHardState, the log entry is still persisted, unlike crashes + // during propose, handle ready, or insert log entry. This is because the entry is already + // physically written to disk in Storage.insertLogEntry (including fsync calls) before + // saveHardState is invoked. This behavior follows the guideline: + // https://pkg.go.dev/go.etcd.io/etcd/raft/v3#section-readme. + // The hard state update merely records metadata about what's committed, but doesn't affect the entry's + // persistence. During recovery, Raft will find the entry on disk even though the hard state wasn't + // updated to reflect it. In a single-node setup, this entry will be considered valid and retained + // because there's no conflicting entry with a higher term from another node. In multi-node setups, this + // behavior might differ as entries could be overwritten by entries with higher terms from the new + // leader. + verifyPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry multiple crash and recovery cycle", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(7)) + + // Keep track of LSNs for each successful append + lastKnownLSN := env.baseLSN + + // First recovery cycle - crash during propose + require.NoError(t, env.mgr.Close()) + firstRecoveryMgr := createRecoveryManager(t, ctx, env, lastKnownLSN) + + // Add one successful entry to advance the LSN + logEntryPath1 := createTestLogEntry(t, ctx, "first-recovery-success") + lsn1, err := firstRecoveryMgr.AppendLogEntry(logEntryPath1) + require.NoError(t, err) + require.Greater(t, lsn1, lastKnownLSN) + lastKnownLSN = lsn1 + + // Set up hook to crash during propose + firstRecoveryMgr.hooks.BeforePropose = func(logEntryPath string) { + panic("simulated crash during first recovery") + } + + // Attempt that will crash + logEntryPath2 := createTestLogEntry(t, ctx, "first-recovery-crash") + require.PanicsWithValue(t, "simulated crash during first recovery", func() { + _, _ = firstRecoveryMgr.AppendLogEntry(logEntryPath2) + }) + + // Close the manager only, keep the DB manager + require.NoError(t, firstRecoveryMgr.Close()) + + // Second recovery cycle - crash during commit + secondRecoveryMgr := createRecoveryManager(t, ctx, env, lastKnownLSN) + + // Add one successful entry to advance the LSN + logEntryPath3 := createTestLogEntry(t, ctx, "second-recovery-success") + lsn2, err := secondRecoveryMgr.AppendLogEntry(logEntryPath3) + require.NoError(t, err) + require.Greater(t, lsn2, lastKnownLSN) + lastKnownLSN = lsn2 + + // Set up hook to crash during commit + secondRecoveryMgr.hooks.BeforeProcessCommittedEntries = func() { + panic("simulated crash during second recovery") + } + + // Attempt that will crash + logEntryPath4 := createTestLogEntry(t, ctx, "second-recovery-crash") + _, err = secondRecoveryMgr.AppendLogEntry(logEntryPath4) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-secondRecoveryMgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during second recovery") + + // Close the manager only, keep the DB manager + require.NoError(t, secondRecoveryMgr.Close()) + + // Final recovery - verify system state after multiple crashes + finalRecoveryMgr := createRecoveryManager(t, ctx, env, lastKnownLSN) + + // For commit crash, the crashed entry should be persisted + require.Greater(t, finalRecoveryMgr.AppendedLSN(), lastKnownLSN, + "commit entry crash should persist the change") + + // Verify crashed entry content + entry, err := wal.ReadManifest(finalRecoveryMgr.GetEntryPath(lastKnownLSN + 1)) + require.NoError(t, err) + require.Contains(t, entry.String(), "second-recovery-crash", + "entry should contain content from crash during commit") + + // Should be able to append new entries after recovery + logEntryPath5 := createTestLogEntry(t, ctx, "final-recovery-success") + finalLSN, err := finalRecoveryMgr.AppendLogEntry(logEntryPath5) + require.NoError(t, err) + require.Greater(t, finalLSN, lastKnownLSN+1) + + require.NoError(t, finalRecoveryMgr.Close()) + }) +} + +func TestManager_Close(t *testing.T) { + t.Parallel() + + t.Run("close initialized manager", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := config.Raft{ + Enabled: true, + RTTMilliseconds: 100, + ElectionTicks: 10, + HeartbeatTicks: 1, + SnapshotDir: testhelper.TempDir(t), + } + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + // Close the manager + err = mgr.Close() + require.NoError(t, err, "expected Close to succeed") + + // Second close should still work (idempotent) + err = mgr.Close() + require.NoError(t, err, "expected second Close to succeed") + }) + + t.Run("close uninitialized manager", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := config.Raft{ + Enabled: true, + RTTMilliseconds: 100, + ElectionTicks: 10, + HeartbeatTicks: 1, + SnapshotDir: testhelper.TempDir(t), + } + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.NoError(t, err) + + // Close without initializing + err = mgr.Close() + require.NoError(t, err, "expected Close to succeed even without initialization") + }) + + t.Run("verify raft internal entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Get entries generated by Raft internal processes + raftEntries := recorder.FromRaft() + require.NotEmpty(t, raftEntries, "expected some internal entries generated by Raft") + + // Verify that at least one entry is a config change (usually the first one) + foundConfigChange := false + for _, entry := range raftEntries { + // Look for entries that might be related to configuration + for _, op := range entry.GetOperations() { + if op.GetSetKey() != nil && string(op.GetSetKey().GetKey()) == string(KeyLastConfigChange) { + foundConfigChange = true + break + } + } + if foundConfigChange { + break + } + } + require.True(t, foundConfigChange, "expected to find at least one config change entry") + }) +} + +func TestManager_NotImplementedLogMethods(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + // Configure manager with Raft enabled + raftCfg := config.Raft{ + Enabled: true, + RTTMilliseconds: 100, + ElectionTicks: 10, + HeartbeatTicks: 1, + SnapshotDir: testhelper.TempDir(t), + } + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Test CompareAndAppendLogEntry - should not be implemented + _, err = mgr.CompareAndAppendLogEntry(1, "/path/to/log") + require.ErrorContains(t, err, "raft manager does not support CompareAndAppendLogEntry") + + // Test DeleteLogEntry - should not be implemented + err = mgr.DeleteLogEntry(1) + require.ErrorContains(t, err, "raft manager does not support DeleteLogEntry") +} diff --git a/internal/gitaly/storage/raftmgr/snapshot_metrics.go b/internal/gitaly/storage/raftmgr/metrics.go similarity index 77% rename from internal/gitaly/storage/raftmgr/snapshot_metrics.go rename to internal/gitaly/storage/raftmgr/metrics.go index ac48c8e8395c3a8c9b94efe24ee39d625d429c24..6dd2b8d3d6f39daf8eb5187074e129e3055dd6f0 100644 --- a/internal/gitaly/storage/raftmgr/snapshot_metrics.go +++ b/internal/gitaly/storage/raftmgr/metrics.go @@ -2,7 +2,7 @@ package raftmgr import "github.com/prometheus/client_golang/prometheus" -// Metrics contains the unscoped collected by Snapshotter. +// Metrics contains the unscoped collected by Raft activities. type Metrics struct { snapSaveSec *prometheus.HistogramVec } @@ -32,15 +32,15 @@ func (m *Metrics) Collect(metrics chan<- prometheus.Metric) { m.snapSaveSec.Collect(metrics) } -// SnapshotterMetrics are metrics scoped for a specific storage -type SnapshotterMetrics struct { +// RaftMetrics are metrics scoped for a specific storage +type RaftMetrics struct { snapSaveSec prometheus.Observer } -// Scope returns snapshotter metrics scoped for a specific storage. -func (m *Metrics) Scope(storage string) SnapshotterMetrics { +// Scope returns Raft metrics scoped for a specific storage. +func (m *Metrics) Scope(storage string) RaftMetrics { labels := prometheus.Labels{"storage": storage} - return SnapshotterMetrics{ + return RaftMetrics{ snapSaveSec: m.snapSaveSec.With(labels), } } diff --git a/internal/gitaly/storage/raftmgr/snapshotter.go b/internal/gitaly/storage/raftmgr/snapshotter.go index e58726abe92111c520503be81a41bc4e5c99e46c..4ecb783062346df086c71be04285a7c914aa2bf6 100644 --- a/internal/gitaly/storage/raftmgr/snapshotter.go +++ b/internal/gitaly/storage/raftmgr/snapshotter.go @@ -40,7 +40,7 @@ type RaftSnapshotter struct { sync.Mutex logger logging.Logger dir string - metrics SnapshotterMetrics + metrics RaftMetrics } // Snapshotter is an interface to implement snapshotting in raft @@ -50,7 +50,7 @@ type Snapshotter interface { } // NewRaftSnapshotter creates a new Snapshotter -func NewRaftSnapshotter(cfg config.Raft, logger logging.Logger, metrics SnapshotterMetrics) (Snapshotter, error) { +func NewRaftSnapshotter(cfg config.Raft, logger logging.Logger, metrics RaftMetrics) (Snapshotter, error) { logger = logger.WithField("component", "raft.snapshotter") logger.Info("Initializing Raft Snapshotter") diff --git a/internal/gitaly/storage/raftmgr/snapshotter_test.go b/internal/gitaly/storage/raftmgr/snapshotter_test.go index 8d5ba51d8d14fe079df38406e27e83beb97fad13..15f745cf98990588217d5e31b5efd3fade2ceab6 100644 --- a/internal/gitaly/storage/raftmgr/snapshotter_test.go +++ b/internal/gitaly/storage/raftmgr/snapshotter_test.go @@ -2,6 +2,8 @@ package raftmgr import ( "bufio" + "bytes" + "context" "os" "path/filepath" "sync" @@ -9,28 +11,22 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/archive" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/snapshot" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/protobuf/encoding/protodelim" ) func TestRaftSnapshotter_materializeSnapshot(t *testing.T) { t.Parallel() - if !testhelper.IsWALEnabled() { - t.Skip(`Transactions must be enabled for raft snapshots to work.`) - } - - // Setup partition with transaction enabled ctx := testhelper.Context(t) cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{ Raft: config.Raft{ @@ -38,53 +34,44 @@ func TestRaftSnapshotter_materializeSnapshot(t *testing.T) { }, })) logger := testhelper.NewLogger(t) - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - // Setup factories - cmdFactory := gittest.NewCommandFactory(t, cfg) - locator := config.NewLocator(cfg) - localRepoFactory := localrepo.NewFactory(logger, locator, cmdFactory, catfileCache) - partitionFactory := partition.NewFactory(cmdFactory, localRepoFactory, partition.NewMetrics(nil), nil) - storageName := cfg.Storages[0].Name storagePath := cfg.Storages[0].Path - // Setup db mgr and storage mgr - dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) - require.NoError(t, err) - defer dbMgr.Close() - storageMgr, err := storagemgr.NewStorageManager( - logger, - cfg.Storages[0].Name, - cfg.Storages[0].Path, - dbMgr, - partitionFactory, - 1, - storagemgr.NewMetrics(cfg.Prometheus), - ) - require.NoError(t, err) - defer storageMgr.Close() - - // Retrieve partition from storage - p, err := storageMgr.GetPartition(ctx, storage.PartitionID(1)) - require.NoError(t, err) - // Create repo so partition is not empty repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ SkipCreationViaService: true, Storage: config.Storage{Name: storageName, Path: storagePath}, }) - metrics := NewMetrics().Scope("default") - // Setup snapshotter - s, err := NewRaftSnapshotter(cfg.Raft, logger, metrics) + // Add some KV entries to the DB so that the collected DB is not empty. + db := getTestDBManager(t, ctx, cfg, logger) + dbTxn := db.NewTransaction(true) + require.NoError(t, dbTxn.Set([]byte("hello"), []byte("world"))) + require.NoError(t, dbTxn.Commit()) + + // Setup a proper snapshot with partition snapshot manager + snapshotManager, err := snapshot.NewManager( + logger, + storagePath, + testhelper.TempDir(t), + snapshot.NewMetrics().Scope(storageName), + ) require.NoError(t, err) + defer testhelper.MustClose(t, snapshotManager) - // Begin transaction on partition - txn, err := p.Begin(ctx, storage.BeginOptions{ - RelativePaths: []string{repo.GetRelativePath()}, - }) + // Create a new snapshot of the partition + snapshot, err := snapshotManager.GetSnapshot(ctx, []string{repo.GetRelativePath()}, false) + require.NoError(t, err) + defer testhelper.MustClose(t, snapshot) + + // Create a mock transaction which depends on the newly created snapshot. + txn := &mockTransaction{ + db: db, + fs: fsrecorder.NewFS(snapshot.Root(), wal.NewEntry(testhelper.TempDir(t))), + } + + // Setup snapshotter + s, err := NewRaftSnapshotter(cfg.Raft, logger, NewMetrics().Scope("default")) require.NoError(t, err) // Package partition's disk into snapshot @@ -101,29 +88,57 @@ func TestRaftSnapshotter_materializeSnapshot(t *testing.T) { require.NoError(t, err) defer tar.Close() + // This commit is a no-op require.NoError(t, txn.Commit(ctx)) // Tar should contain a directory with repository - testhelper.ContainsTarState(t, bufio.NewReader(tar), testhelper.DirectoryState{ + directoryState := testhelper.DirectoryState{ filepath.Join("fs", repo.GetRelativePath()): {Mode: archive.DirectoryMode}, - filepath.Join("fs", repo.GetRelativePath(), "HEAD"): {Mode: archive.TarFileMode, Content: []byte("ref: refs/heads/main\n")}, filepath.Join("fs", repo.GetRelativePath(), "objects"): {Mode: archive.DirectoryMode}, filepath.Join("fs", repo.GetRelativePath(), "objects", "info"): {Mode: archive.DirectoryMode}, filepath.Join("fs", repo.GetRelativePath(), "objects", "pack"): {Mode: archive.DirectoryMode}, - filepath.Join("fs", repo.GetRelativePath(), "refs"): {Mode: archive.DirectoryMode}, - filepath.Join("fs", repo.GetRelativePath(), "refs", "heads"): {Mode: archive.DirectoryMode}, - filepath.Join("fs", repo.GetRelativePath(), "refs", "tags"): {Mode: archive.DirectoryMode}, - "kv-state": {Mode: archive.TarFileMode, Content: []byte{}}, - }) + filepath.Join("fs", repo.GetRelativePath(), "config"): { + Mode: archive.TarFileMode, + Content: "config content", + ParseContent: func(tb testing.TB, path string, content []byte) any { + require.Equal(t, filepath.Join("fs", repo.GetRelativePath(), "config"), path) + return "config content" + }, + }, + filepath.Join("fs", repo.GetRelativePath(), "refs"): {Mode: archive.DirectoryMode}, + "kv-state": {Mode: archive.TarFileMode, ParseContent: func(tb testing.TB, path string, content []byte) any { + var keyPair gitalypb.KVPair + require.NoError(t, protodelim.UnmarshalFrom(bytes.NewReader(content), &keyPair)) + + testhelper.ProtoEqual(t, &gitalypb.KVPair{ + Key: []byte("hello"), + Value: []byte("world"), + }, &keyPair) + return nil + }}, + } + if testhelper.IsReftableEnabled() { + directoryState[filepath.Join("fs", repo.GetRelativePath(), "refs", "heads")] = testhelper.DirectoryEntry{ + Mode: archive.TarFileMode, + Content: []byte("this repository uses the reftable format\n"), + } + } else { + directoryState[filepath.Join("fs", repo.GetRelativePath(), "HEAD")] = testhelper.DirectoryEntry{ + Mode: archive.TarFileMode, Content: []byte("ref: refs/heads/main\n"), + } + directoryState[filepath.Join("fs", repo.GetRelativePath(), "refs", "heads")] = testhelper.DirectoryEntry{ + Mode: archive.DirectoryMode, + } + directoryState[filepath.Join("fs", repo.GetRelativePath(), "refs", "tags")] = testhelper.DirectoryEntry{ + Mode: archive.DirectoryMode, + } + } + testhelper.ContainsTarState(t, bufio.NewReader(tar), directoryState) } func TestRaftStorage_TriggerSnapshot(t *testing.T) { t.Parallel() - if !testhelper.IsWALEnabled() { - t.Skip(`Transactions must be enabled for raft snapshots to work.`) - } - ctx := testhelper.Context(t) t.Run("reject snapshot creation if no transaction found in context", func(t *testing.T) { // Create a mock Storage and mock RaftSnapshotter @@ -227,9 +242,25 @@ func (m *MockRaftSnapshotter) materializeSnapshot(snapshotMetadata SnapshotMetad }, nil } -type mockTransaction struct{ storage.Transaction } +type mockTransaction struct { + storage.Transaction + db keyvalue.Transactioner + fs fsrecorder.FS +} -// Mock SnapshotLSN func (*mockTransaction) SnapshotLSN() storage.LSN { return storage.LSN(1) } + +func (m *mockTransaction) KV() keyvalue.ReadWriter { + return m.db.NewTransaction(true) +} + +func (m *mockTransaction) FS() storage.FS { + return m.fs +} + +func (m *mockTransaction) Commit(context.Context) error { + // No-Op + return nil +} diff --git a/internal/gitaly/storage/raftmgr/storage.go b/internal/gitaly/storage/raftmgr/storage.go index 97a4644b5a38f87aef3621da488bbde417b94ccf..cc46b24b3781f68e75e16a20f34b9a4669c1ddfe 100644 --- a/internal/gitaly/storage/raftmgr/storage.go +++ b/internal/gitaly/storage/raftmgr/storage.go @@ -115,6 +115,9 @@ type Storage struct { consumer storage.LogConsumer stagingDir string snapshotter Snapshotter + + // hooks is a collection of hooks, used in test environment to intercept critical events + hooks testHooks } // raftManifestPath returns the path to the manifest file within a log entry directory. The manifest file contains @@ -175,6 +178,7 @@ func NewStorage( consumer: consumer, stagingDir: stagingDirectory, snapshotter: snapshotter, + hooks: noopHooks(), }, nil } @@ -375,6 +379,8 @@ func (s *Storage) saveHardState(hardState raftpb.HardState) error { committedLSN := storage.LSN(hardState.Commit) if err := func() error { + s.hooks.BeforeSaveHardState() + s.mutex.Lock() defer s.mutex.Unlock() @@ -490,6 +496,12 @@ func (s *Storage) draftLogEntry(raftEntry raftpb.Entry, callback func(*wal.Entry }); err != nil { return fmt.Errorf("writing manifest file: %w", err) } + // The fsync is essential to flush the content of the manifest file itself. We also need to fsync the parent to + // ensure the creation of the file is flushed. That part will be covered in insertLogEntry after the Raft + // artifact file is created. + if err := safe.NewSyncer().Sync(s.ctx, wal.ManifestPath(walEntry.Directory())); err != nil { + return fmt.Errorf("sync raft manifest file: %w", err) + } // Finally, insert it to WAL. return s.insertLogEntry(raftEntry, logEntryPath) @@ -497,6 +509,8 @@ func (s *Storage) draftLogEntry(raftEntry raftpb.Entry, callback func(*wal.Entry // insertLogEntry inserts a log entry to WAL at a certain position with respective Raft metadata. func (s *Storage) insertLogEntry(raftEntry raftpb.Entry, logEntryPath string) error { + s.hooks.BeforeInsertLogEntry(raftEntry.Index) + s.mutex.Lock() defer s.mutex.Unlock() diff --git a/internal/gitaly/storage/raftmgr/storage_test.go b/internal/gitaly/storage/raftmgr/storage_test.go index e69db97bf50c351ee095368cf06f15943d358d6a..e007facae65ac843dac12afccdfdff58bfdddb4a 100644 --- a/internal/gitaly/storage/raftmgr/storage_test.go +++ b/internal/gitaly/storage/raftmgr/storage_test.go @@ -3,18 +3,14 @@ package raftmgr import ( "context" "fmt" - "sync" "testing" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper" - lg "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -22,48 +18,6 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" ) -type mockConsumer struct { - notifications []mockNotification - mutex sync.Mutex -} - -type mockNotification struct { - storageName string - partitionID storage.PartitionID - lowWaterMark storage.LSN - highWaterMark storage.LSN -} - -func (mc *mockConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, committedLSN storage.LSN) { - mc.mutex.Lock() - defer mc.mutex.Unlock() - mc.notifications = append(mc.notifications, mockNotification{ - storageName: storageName, - partitionID: partitionID, - lowWaterMark: lowWaterMark, - highWaterMark: committedLSN, - }) -} - -func (mc *mockConsumer) GetNotifications() []mockNotification { - mc.mutex.Lock() - defer mc.mutex.Unlock() - return mc.notifications -} - -func getTestDBManager(t *testing.T, ctx context.Context, cfg config.Cfg, logger lg.Logger) keyvalue.Transactioner { - t.Helper() - - dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) - require.NoError(t, err) - t.Cleanup(dbMgr.Close) - - db, err := dbMgr.GetDB(cfg.Storages[0].Name) - require.NoError(t, err) - - return db -} - func setupStorage(t *testing.T, ctx context.Context, cfg config.Cfg) *Storage { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index c6c4a4694c30ebb56c1fa7cfbbf447b96e34b06a..6c24a02451165d641eda834dc438b7de29a371f8 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -2,9 +2,15 @@ package raftmgr import ( "context" + "sync" "testing" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" logger "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "go.etcd.io/etcd/raft/v3/raftpb" @@ -15,6 +21,7 @@ func TestMain(m *testing.M) { } type mockRaftManager struct { + RaftManager logger logger.LogrusLogger transport Transport logManager storage.LogManager @@ -33,3 +40,50 @@ func (m *mockRaftManager) GetLogReader() storage.LogReader { func (m *mockRaftManager) Step(ctx context.Context, msg raftpb.Message) error { return nil } + +type mockConsumer struct { + notifications []mockNotification + mutex sync.Mutex +} + +type mockNotification struct { + storageName string + partitionID storage.PartitionID + lowWaterMark storage.LSN + highWaterMark storage.LSN +} + +func (mc *mockConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, committedLSN storage.LSN) { + mc.mutex.Lock() + defer mc.mutex.Unlock() + mc.notifications = append(mc.notifications, mockNotification{ + storageName: storageName, + partitionID: partitionID, + lowWaterMark: lowWaterMark, + highWaterMark: committedLSN, + }) +} + +func (mc *mockConsumer) GetNotifications() []mockNotification { + mc.mutex.Lock() + defer mc.mutex.Unlock() + return mc.notifications +} + +func openTestDB(t *testing.T, ctx context.Context, cfg config.Cfg, logger logger.Logger) *databasemgr.DBManager { + dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) + require.NoError(t, err) + return dbMgr +} + +func getTestDBManager(t *testing.T, ctx context.Context, cfg config.Cfg, logger logger.Logger) keyvalue.Transactioner { + t.Helper() + + dbMgr := openTestDB(t, ctx, cfg, logger) + t.Cleanup(dbMgr.Close) + + db, err := dbMgr.GetDB(cfg.Storages[0].Name) + require.NoError(t, err) + + return db +} diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index 82a92808977862e08d07372444c31970d2665176..90a06c15d4387290c95959d5c236d3cfd0fb0d74 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -6,8 +6,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" logger "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -15,10 +17,12 @@ import ( // Factory is factory type that can create new partitions. type Factory struct { - cmdFactory gitcmd.CommandFactory - repoFactory localrepo.Factory - metrics Metrics - logConsumer storage.LogConsumer + cmdFactory gitcmd.CommandFactory + repoFactory localrepo.Factory + partitionMetrics Metrics + logConsumer storage.LogConsumer + raftCfg config.Raft + raftFactory raftmgr.RaftManagerFactory } // New returns a new Partition instance. @@ -26,8 +30,7 @@ func (f Factory) New( logger logger.Logger, partitionID storage.PartitionID, db keyvalue.Transactioner, - storageName string, - storagePath string, + storageName string, storagePath string, absoluteStateDir string, stagingDir string, ) storagemgr.Partition { @@ -50,7 +53,42 @@ func (f Factory) New( } } - logManager := log.NewManager(storageName, partitionID, stagingDir, absoluteStateDir, f.logConsumer, positionTracker) + var logManager storage.LogManager + if f.raftCfg.Enabled { + factory := f.raftFactory + if factory == nil { + factory = raftmgr.DefaultFactory(f.raftCfg) + } + + raftStorage, err := raftmgr.NewStorage( + f.raftCfg, + logger, + storageName, + partitionID, + db, + stagingDir, + absoluteStateDir, + f.logConsumer, + positionTracker, + f.partitionMetrics.raft, + ) + if err != nil { + panic(fmt.Errorf("creating raft storage: %w", err)) + } + raftManager, err := factory( + storageName, + partitionID, + raftStorage, + logger, + ) + if err != nil { + panic(fmt.Errorf("creating raft manager: %w", err)) + } + logManager = raftManager + } else { + logManager = log.NewManager(storageName, partitionID, stagingDir, absoluteStateDir, f.logConsumer, positionTracker) + } + return NewTransactionManager( partitionID, logger, @@ -61,22 +99,31 @@ func (f Factory) New( stagingDir, f.cmdFactory, repoFactory, - f.metrics.Scope(storageName), + f.partitionMetrics.Scope(storageName), logManager, ) } -// NewFactory returns a new Factory. +// NewFactory creates a partition factory with the given components: +// - cmdFactory: Used to create Git commands +// - repoFactory: Used to create local repository instances +// - metrics: Used to track partition operations +// - logConsumer: Consumes WAL entries (optional, can be nil) +// - raftFactory: Creates Raft managers for replicated partitions (optional, can be nil) func NewFactory( cmdFactory gitcmd.CommandFactory, repoFactory localrepo.Factory, - metrics Metrics, + partitionMetrics Metrics, logConsumer storage.LogConsumer, + raftCfg config.Raft, + raftFactory raftmgr.RaftManagerFactory, ) Factory { return Factory{ - cmdFactory: cmdFactory, - repoFactory: repoFactory, - metrics: metrics, - logConsumer: logConsumer, + cmdFactory: cmdFactory, + repoFactory: repoFactory, + partitionMetrics: partitionMetrics, + logConsumer: logConsumer, + raftCfg: raftCfg, + raftFactory: raftFactory, } } diff --git a/internal/gitaly/storage/storagemgr/partition/metrics.go b/internal/gitaly/storage/storagemgr/partition/metrics.go index 738221b1a4a69078f56dac359536b52247f60ebe..a97033a559a9b3946bd8d8d63d1336fb1880de6b 100644 --- a/internal/gitaly/storage/storagemgr/partition/metrics.go +++ b/internal/gitaly/storage/storagemgr/partition/metrics.go @@ -3,6 +3,7 @@ package partition import ( "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/snapshot" ) @@ -12,6 +13,7 @@ type Metrics struct { // them is the responsibility of the caller. housekeeping *housekeeping.Metrics snapshot snapshot.Metrics + raft *raftmgr.Metrics commitQueueDepth *prometheus.GaugeVec commitQueueWaitSeconds *prometheus.HistogramVec @@ -30,6 +32,7 @@ func NewMetrics(housekeeping *housekeeping.Metrics) Metrics { return Metrics{ housekeeping: housekeeping, snapshot: snapshot.NewMetrics(), + raft: raftmgr.NewMetrics(), commitQueueDepth: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "gitaly_transaction_commit_queue_depth", Help: "Records the number transactions waiting in the commit queue.", @@ -65,6 +68,7 @@ func (m Metrics) Describe(out chan<- *prometheus.Desc) { // Collect implements prometheus.Collector. func (m Metrics) Collect(out chan<- prometheus.Metric) { m.snapshot.Collect(out) + m.raft.Collect(out) m.commitQueueDepth.Collect(out) m.commitQueueWaitSeconds.Collect(out) m.transactionControlStatementDurationSeconds.Collect(out) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go index 92429d21f9a84d8c442764b49055565e772676fd..4075dd3c31f290dac9c6cc649569798d3d699901 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go @@ -175,7 +175,7 @@ func TestMigrationManager_Begin(t *testing.T) { m := partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)) - factory := partition.NewFactory(cmdFactory, repositoryFactory, m, nil) + factory := partition.NewFactory(cmdFactory, repositoryFactory, m, nil, cfg.Raft, nil) tm := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir) ctx, cancel := context.WithCancel(ctx) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_reftable_migration_test.go b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_reftable_migration_test.go index ca15cbc64755e956c3ea18e084e74a5360f8a03a..78965e9d70042cb86c000fea456357feaca8936a 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_reftable_migration_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_reftable_migration_test.go @@ -133,7 +133,7 @@ func testReftableMigration(t *testing.T, ctx context.Context) { cmdFactory := gittest.NewCommandFactory(t, cfg) localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - partitionFactory := partition.NewFactory(cmdFactory, localRepoFactory, partition.NewMetrics(nil), nil) + partitionFactory := partition.NewFactory(cmdFactory, localRepoFactory, partition.NewMetrics(nil), nil, cfg.Raft, nil) database, err := keyvalue.NewBadgerStore(testhelper.SharedLogger(t), t.TempDir()) require.NoError(t, err) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 24671dad1fc9afb5f7b420cb00569adc1f73c9c9..6f4ece9124b7eabd771f6008b43a1014864fbcba 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -10,11 +10,14 @@ import ( "os" "path/filepath" "reflect" + "regexp" "sort" + "strconv" "strings" "sync" "testing" + "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" @@ -36,10 +39,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" + logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" ) func TestMain(m *testing.M) { @@ -672,14 +678,30 @@ func RequireDatabase(tb testing.TB, ctx context.Context, database keyvalue.Trans // Unmarshal the actual value to the same type as the expected value. actualValue := reflect.New(reflect.TypeOf(expectedValue).Elem()).Interface().(proto.Message) - require.NoError(tb, proto.Unmarshal(value, actualValue)) + require.NoErrorf(tb, proto.Unmarshal(value, actualValue), "unmarshalling value in db: %q", key) + + if _, isAny := expectedValue.(*anypb.Any); isAny { + // Verify if the database contains the key, but the content does not matter. + actualState[string(key)] = expectedValue + continue + } actualState[string(key)] = actualValue } return nil })) - require.Empty(tb, unexpectedKeys, "database contains unexpected keys") + require.Emptyf(tb, unexpectedKeys, "database contains unexpected keys") + + // Ignore optional keys in expectedState but not in actualState + for k, v := range expectedState { + if _, isAny := v.(*anypb.Any); isAny { + if _, exist := actualState[k]; !exist { + delete(expectedState, k) + } + } + } + testhelper.ProtoEqual(tb, expectedState, actualState) } @@ -960,6 +982,8 @@ type RepositoryAssertion struct { type StateAssertion struct { // Database is the expected state of the database. Database DatabaseState + // NotOffsetDatabaseInRaft indicates if the LSN in the database should not be shifted. + NotOffsetDatabaseInRaft bool // Directory is the expected state of the manager's state directory in the repository. Directory testhelper.DirectoryState // Repositories is the expected state of the repositories in the storage. The key is @@ -1047,11 +1071,27 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas return NewMetrics(housekeeping.NewMetrics(setup.Config.Prometheus)) } + var raftFactory raftmgr.RaftManagerFactory + var raftEntryRecorder *raftmgr.EntryRecorder + clusterID := uuid.New().String() + if testhelper.IsRaftEnabled() { + raftEntryRecorder = raftmgr.NewEntryRecorder() + setup.Config.Raft = config.DefaultRaftConfig(clusterID) + // Speed up initial election overhead in the test setup + setup.Config.Raft.ElectionTicks = 5 + setup.Config.Raft.RTTMilliseconds = 100 + setup.Config.Raft.SnapshotDir = testhelper.TempDir(t) + + raftFactory = func(storageName string, partitionID storage.PartitionID, raftStorage *raftmgr.Storage, logger logging.Logger) (*raftmgr.Manager, error) { + return raftmgr.NewManager(storageName, partitionID, setup.Config.Raft, raftStorage, logger, raftmgr.WithEntryRecorder(raftEntryRecorder)) + } + } + var ( // managerRunning tracks whether the manager is running or closed. managerRunning bool // factory is the factory that produces the current TransactionManager - factory = NewFactory(setup.CommandFactory, setup.RepositoryFactory, newMetrics(), setup.Consumer) + factory = NewFactory(setup.CommandFactory, setup.RepositoryFactory, newMetrics(), setup.Consumer, setup.Config.Raft, raftFactory) // transactionManager is the current TransactionManager instance. transactionManager = factory.New(logger, setup.PartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) // managerErr is used for synchronizing manager closing and returning @@ -1103,7 +1143,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) transactionManager = factory.New(logger, setup.PartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) - installHooks(transactionManager, &inflightTransactions, step.Hooks) + installHooks(transactionManager, &inflightTransactions, step.Hooks, raftEntryRecorder) go func() { defer func() { @@ -1145,7 +1185,11 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.NoError(t, err) tx := transaction.(*Transaction) - require.Equalf(t, step.ExpectedSnapshotLSN, tx.SnapshotLSN(), "mismatched ExpectedSnapshotLSN") + expectedSnapshotLSN := step.ExpectedSnapshotLSN + if testhelper.IsRaftEnabled() { + expectedSnapshotLSN = raftEntryRecorder.Offset(expectedSnapshotLSN) + } + require.Equalf(t, expectedSnapshotLSN, tx.SnapshotLSN(), "mismatched ExpectedSnapshotLSN") require.NotEmpty(t, tx.Root(), "empty Root") require.Contains(t, tx.Root(), transactionManager.snapshotsDir()) @@ -1420,7 +1464,11 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction := openTransactions[step.TransactionID] transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: - require.NoError(t, transactionManager.logManager.AcknowledgePosition(log.ConsumerPosition, step.LSN)) + lsn := step.LSN + if testhelper.IsRaftEnabled() { + lsn = raftEntryRecorder.Offset(lsn) + } + require.NoError(t, transactionManager.logManager.AcknowledgePosition(log.ConsumerPosition, lsn)) case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] @@ -1487,6 +1535,36 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.NoError(t, err) } + // If singular Raft cluster is enabled, the Raft handler starts to insert raft log entries. We need to offset + // the expected LSN to account for LSN shifting. As the order of insertion is not deterministic, the LSN + // shifting depends on a Raft event recorder. + // + // Without Raft: + // Entry 1 (update A) -> Entry 2 (update B) + // + // With Raft: + // Entry 1 (ConfChange) -> Entry 2 (Empty verification entry) -> Entry 3 (update A) -> Entry 4 (update B) + if testhelper.IsRaftEnabled() && raftEntryRecorder.Len() > 0 { + if !tc.expectedState.NotOffsetDatabaseInRaft { + appliedLSN, exist := tc.expectedState.Database[string(keyAppliedLSN)] + if exist { + // If expected applied LSN is present, offset expected LSN. + appliedLSN := appliedLSN.(*gitalypb.LSN) + tc.expectedState.Database[string(keyAppliedLSN)] = raftEntryRecorder.Offset(storage.LSN(appliedLSN.GetValue())).ToProto() + } else { + // Otherwise, the test expects no applied log entry in cases such as invalid transactions. + // Regardless, raft log entries are applied successfully. + if tc.expectedState.Database == nil { + tc.expectedState.Database = DatabaseState{} + } + tc.expectedState.Database[string(keyAppliedLSN)] = raftEntryRecorder.Latest().ToProto() + } + } + // Those are raft-specific keys. They should not be a concern of TransactionManager tests. + for _, key := range raftmgr.RaftDBKeys { + tc.expectedState.Database[string(key)] = &anypb.Any{} + } + } RequireDatabase(t, ctx, database, tc.expectedState.Database) expectedRepositories := tc.expectedState.Repositories @@ -1538,6 +1616,8 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } } + expectedDirectory = modifyDirectoryStateForRaft(t, expectedDirectory, transactionManager, raftEntryRecorder) + testhelper.RequireDirectoryState(t, stateDir, "", expectedDirectory) expectedStagingDirState := testhelper.DirectoryState{ @@ -1555,6 +1635,64 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas testhelper.RequireDirectoryState(t, transactionManager.stagingDirectory, "", expectedStagingDirState) } +// modifyDirectoryStateForRaft modifies log entry directory. It does three major things: +// - Shift expected log entry paths to the corresponding location. The LSN of normal entries are affected by Raft's +// internal entries. +// - Insert RAFT metadata files. +// - Insert Raft's internal log entries if they are not pruned away. +func modifyDirectoryStateForRaft(t *testing.T, expectedDirectory testhelper.DirectoryState, tm *TransactionManager, recorder *raftmgr.EntryRecorder) testhelper.DirectoryState { + if !testhelper.IsRaftEnabled() { + return expectedDirectory + } + + newExpectedDirectory := testhelper.DirectoryState{} + lsnRegex := regexp.MustCompile(`/wal/(\d+)\/?.*`) + for path, state := range expectedDirectory { + matches := lsnRegex.FindStringSubmatch(path) + if len(matches) == 0 { + // If no LSN found, retain the original entry + newExpectedDirectory[path] = state + continue + } + // Extract and offset the LSN + lsnStr := matches[1] + lsn, err := strconv.ParseUint(lsnStr, 10, 64) + require.NoError(t, err) + + offsetLSN := recorder.Offset(storage.LSN(lsn)) + offsetPath := strings.Replace(path, fmt.Sprintf("/%s", lsnStr), fmt.Sprintf("/%s", offsetLSN.String()), 1) + newExpectedDirectory[offsetPath] = state + newExpectedDirectory[fmt.Sprintf("/wal/%s/RAFT", offsetLSN)] = testhelper.DirectoryEntry{ + Mode: mode.File, + Content: "anything", + ParseContent: func(tb testing.TB, path string, content []byte) any { + // The test should not be concerned with the actual content of RAFT file. + return "anything" + }, + } + } + + // Insert Raft-specific log entries into the new directory structure + raftEntries := recorder.FromRaft() + for lsn, raftEntry := range raftEntries { + if lsn >= tm.GetLogReader().LowWaterMark() { + newExpectedDirectory[fmt.Sprintf("/wal/%s", lsn)] = testhelper.DirectoryEntry{Mode: mode.Directory} + newExpectedDirectory[fmt.Sprintf("/wal/%s/MANIFEST", lsn)] = manifestDirectoryEntry(raftEntry) + newExpectedDirectory[fmt.Sprintf("/wal/%s/RAFT", lsn)] = testhelper.DirectoryEntry{ + Mode: mode.File, + Content: "anything", + ParseContent: func(tb testing.TB, path string, content []byte) any { + // The test should not be concerned with the actual content of RAFT file. + return "anything" + }, + } + } + } + + // Update expected directory + return newExpectedDirectory +} + func checkManagerError(t *testing.T, ctx context.Context, managerErrChannel chan error, mgr *TransactionManager) (bool, error) { t.Helper() diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go index e29c0d51cf4c9d286a71f36b84641560a3b68f32..0aad4c28496e3294df95d82ec7b717f1d4016058 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go @@ -462,7 +462,10 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti }, }, { - desc: "stopped manager does not prune acknowledged entry", + desc: "stopped manager does not prune acknowledged entry", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, "asserting pruning of internal log entries in this case is not relibale") + }, customSetup: customSetup, steps: steps{ StartManager{}, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index 529afda61dc1cb4d430bf70d8e8ffb390333a7ac..0915106430b350bb8679d1923bbf15428956673c 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -8,6 +8,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/refdb" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) @@ -22,10 +23,18 @@ type hookContext struct { closeManager func() // lsn stores the LSN context when the hook is triggered. lsn storage.LSN + // raftEntryRecorder normal and Raft-specific entries that are inserted into the WAL. It is used to filter out entries + // injected by Raft in hook executions. + raftEntryRecorder *raftmgr.EntryRecorder } // installHooks takes the hooks in the test setup and configures them in the TransactionManager. -func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, hooks testTransactionHooks) { +func installHooks( + mgr *TransactionManager, + inflightTransactions *sync.WaitGroup, + hooks testTransactionHooks, + raftEntryRecorder *raftmgr.EntryRecorder, +) { for destination, source := range map[*func()]hookFunc{ &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, &mgr.testHooks.beforeRunExiting: func(hookContext) { @@ -56,8 +65,9 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, runHook := source *destination = func(lsn storage.LSN) { runHook(hookContext{ - closeManager: mgr.Close, - lsn: lsn, + closeManager: mgr.Close, + lsn: lsn, + raftEntryRecorder: raftEntryRecorder, }) } } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index 7f2c97bb10e80eb2df6a2d209cd9287bd90643b4..57ff1158538c70040bae7f19b17f7dfbde24e37a 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -43,8 +43,16 @@ var errSimulatedCrash = errors.New("simulated crash") // simulateCrashHook returns a hook function that panics with errSimulatedCrash. var simulateCrashHook = func() func(hookContext) { - return func(hookContext) { - panic(errSimulatedCrash) + return func(c hookContext) { + if !testhelper.IsRaftEnabled() { + // Always panic if Raft is not enabled + panic(errSimulatedCrash) + } else if c.raftEntryRecorder == nil { + // Some hooks don't include Raft entry recorder. + panic(errSimulatedCrash) + } else if !c.raftEntryRecorder.IsFromRaft(c.lsn) { + panic(errSimulatedCrash) + } } } @@ -491,6 +499,11 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio }(), { desc: "commit returns if transaction processing stops before transaction acceptance", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `The hook is installed before appending log entry, before + recorder is activated. Hence, it's not feasible to differentiate between normal + entries and Raft internal entries`) + }, steps: steps{ StartManager{ Hooks: testTransactionHooks{ @@ -532,6 +545,15 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio }, }, expectedState: StateAssertion{ + Database: testhelper.WithOrWithoutRaft( + // The process crashes before apply but after it's committed. So, only + // committedLSN is persisted. + DatabaseState{ + string(keyAppliedLSN): storage.LSN(2).ToProto(), + }, + DatabaseState{}, + ), + NotOffsetDatabaseInRaft: true, Directory: gittest.FilesOrReftables(testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -1459,6 +1481,10 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio ExpectedError: errSimulatedCrash, }, }, + expectedState: StateAssertion{ + Database: DatabaseState{}, + NotOffsetDatabaseInRaft: true, + }, }, { desc: "transaction rollbacked after already being rollbacked", @@ -1755,6 +1781,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t return []transactionTestCase{ { desc: "manager has just initialized", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { @@ -1764,6 +1793,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, { desc: "a transaction has one reader", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, Begin{ @@ -1872,6 +1904,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, { desc: "a transaction has multiple readers", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, Begin{ @@ -2037,6 +2072,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, { desc: "committed read-only transaction are not kept", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, Begin{ @@ -2073,6 +2111,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, { desc: "transaction manager cleans up left-over committed entries when appliedLSN == appendedLSN", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, Begin{ @@ -2202,6 +2243,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t desc: "transaction manager cleans up left-over committed entries when appliedLSN < appendedLSN", skip: func(t *testing.T) { testhelper.SkipWithReftable(t, "test requires manual log addition") + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) }, steps: steps{ StartManager{}, @@ -2413,7 +2455,7 @@ func BenchmarkTransactionManager(b *testing.B) { // Valid partition IDs are >=1. testPartitionID := storage.PartitionID(i + 1) - factory := NewFactory(cmdFactory, repositoryFactory, m, nil) + factory := NewFactory(cmdFactory, repositoryFactory, m, nil, cfg.Raft, nil) // transactionManager is the current TransactionManager instance. manager := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) diff --git a/internal/testhelper/testcfg/gitaly.go b/internal/testhelper/testcfg/gitaly.go index 726092838afe7827659953c276b92f5ad8fd214c..e8ab8c16fe77f9bca5b7e16ed1acac0a47b580f2 100644 --- a/internal/testhelper/testcfg/gitaly.go +++ b/internal/testhelper/testcfg/gitaly.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/pelletier/go-toml/v2" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" @@ -172,6 +173,14 @@ func (gc *GitalyCfgBuilder) Build(tb testing.TB) config.Cfg { } cfg.Transactions.Enabled = testhelper.IsWALEnabled() + if testhelper.IsRaftEnabled() && !testhelper.IsPraefectEnabled() { + cfg.Transactions.Enabled = true + cfg.Raft = config.DefaultRaftConfig(uuid.New().String()) + // Speed up initial election overhead in the test setup + cfg.Raft.ElectionTicks = 5 + cfg.Raft.RTTMilliseconds = 100 + cfg.Raft.SnapshotDir = testhelper.TempDir(tb) + } // We force the use of bundled (embedded) binaries unless we're specifically executing tests // against a custom version of Git. diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index d1d51e3f4f6eb3f8ffa3b4a4b3d849674b64b2b2..9130e142f14bdf12475b8e69cfd9cfd6da6e63c7 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -57,6 +57,30 @@ func IsWALEnabled() bool { return ok } +// IsRaftEnabled returns whether Raft single cluster is enabled in this testing run. +func IsRaftEnabled() bool { + _, ok := os.LookupEnv("GITALY_TEST_RAFT") + if ok && !IsWALEnabled() { + panic("GITALY_TEST_WAL must be enabled") + } + return ok +} + +// WithOrWithoutRaft returns a value correspondingly to if Raft is enabled or not. +func WithOrWithoutRaft[T any](raftVal, noRaftVal T) T { + if IsRaftEnabled() { + return raftVal + } + return noRaftVal +} + +// SkipWithRaft skips the test if Raft is enabled in this testing run. +func SkipWithRaft(tb testing.TB, reason string) { + if IsRaftEnabled() { + tb.Skip(reason) + } +} + // SkipWithWAL skips the test if write-ahead logging is enabled in this testing run. A reason // should be provided either as a description or a link to an issue to explain why the test is // skipped. diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 7c9a8301e36fca1901e510106e6d07f808bf3cfb..75f1493cdaff3e3e6c492ee8e2d46591765f0ec2 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -6,6 +6,7 @@ import ( "os" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" @@ -30,6 +31,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" @@ -358,6 +360,16 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte require.NoError(tb, err) tb.Cleanup(dbMgr.Close) + var raftFactory raftmgr.RaftManagerFactory + if testhelper.IsRaftEnabled() && !testhelper.IsPraefectEnabled() { + cfg.Raft = config.DefaultRaftConfig(uuid.New().String()) + // Speed up initial election overhead in the test setup + cfg.Raft.ElectionTicks = 5 + cfg.Raft.RTTMilliseconds = 100 + cfg.Raft.SnapshotDir = testhelper.TempDir(tb) + raftFactory = raftmgr.DefaultFactory(cfg.Raft) + } + nodeMgr, err := nodeimpl.NewManager( cfg.Storages, storagemgr.NewFactory( @@ -369,6 +381,8 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + raftFactory, ), migration.NewMetrics(), migrations,