From 91f69beadd3e982231466370521cf058afda9094 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 25 Feb 2025 16:26:26 +0700 Subject: [PATCH 01/10] raft: Improve error propagation in Registry's event tracking This commit refactors the error handling mechanism in the Registry for improved error propagation. The Waiter struct's channel is changed from a struct channel to an error channel, enabling error information to be directly communicated to waiters. These improvements allow clients to receive specific error information when their events are untracked, providing better context about why their operations were interrupted. It also prevents a race when the waiter channel is closed before the error is set. A new UntrackAll method is added to handle complete registry cleanup scenarios. --- .../gitaly/storage/raftmgr/event_registry.go | 29 +++++++----- .../storage/raftmgr/event_registry_test.go | 45 ++++++++++++++++--- internal/gitaly/storage/raftmgr/manager.go | 5 +++ 3 files changed, 62 insertions(+), 17 deletions(-) diff --git a/internal/gitaly/storage/raftmgr/event_registry.go b/internal/gitaly/storage/raftmgr/event_registry.go index 468a95dc709..0b93909fe9e 100644 --- a/internal/gitaly/storage/raftmgr/event_registry.go +++ b/internal/gitaly/storage/raftmgr/event_registry.go @@ -1,16 +1,11 @@ package raftmgr import ( - "fmt" "sync" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" ) -// ErrObsoleted is returned when an event associated with a LSN is shadowed by another one with higher term. That event -// must be unlocked and removed from the registry. -var ErrObsoleted = fmt.Errorf("obsoleted event, shadowed by a log entry with higher term") - // EventID uniquely identifies an event in the registry. type EventID uint64 @@ -18,8 +13,7 @@ type EventID uint64 type Waiter struct { ID EventID LSN storage.LSN - C chan struct{} - Err error + C chan error } // Registry manages events and their associated waiters, enabling the registration @@ -47,7 +41,7 @@ func (r *Registry) Register() *Waiter { r.nextEventID++ waiter := &Waiter{ ID: r.nextEventID, - C: make(chan struct{}), + C: make(chan error, 1), } r.waiters[r.nextEventID] = waiter @@ -67,8 +61,9 @@ func (r *Registry) AssignLSN(id EventID, lsn storage.LSN) { waiter.LSN = lsn } -// UntrackSince untracks all events having LSNs greater than or equal to the input LSN. -func (r *Registry) UntrackSince(lsn storage.LSN) { +// UntrackSince untracks all events having LSNs greater than or equal to the input LSN. The input error is assigned to +// impacted events. +func (r *Registry) UntrackSince(lsn storage.LSN, err error) { r.mu.Lock() defer r.mu.Unlock() @@ -79,12 +74,24 @@ func (r *Registry) UntrackSince(lsn storage.LSN) { } } for _, id := range toRemove { + r.waiters[id].C <- err close(r.waiters[id].C) - r.waiters[id].Err = ErrObsoleted delete(r.waiters, id) } } +// UntrackAll untracks all events. The input error is assigned to impacted events. +func (r *Registry) UntrackAll(err error) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, w := range r.waiters { + w.C <- err + close(w.C) + } + clear(r.waiters) +} + // Untrack closes the channel associated with a given EventID and removes the waiter from the registry once the event is // committed. This function returns if the registry is still tracking the event. func (r *Registry) Untrack(id EventID) bool { diff --git a/internal/gitaly/storage/raftmgr/event_registry_test.go b/internal/gitaly/storage/raftmgr/event_registry_test.go index 8c55f65a578..090e3f61269 100644 --- a/internal/gitaly/storage/raftmgr/event_registry_test.go +++ b/internal/gitaly/storage/raftmgr/event_registry_test.go @@ -1,6 +1,7 @@ package raftmgr import ( + "fmt" "sync" "testing" "time" @@ -54,7 +55,7 @@ func TestRegistry_Untrack(t *testing.T) { action: func(t *testing.T, r *Registry) []*Waiter { require.False(t, r.Untrack(1234), "event should not be tracked") - c := make(chan struct{}) + c := make(chan error, 1) close(c) return []*Waiter{{ID: 99999, C: c}} // Non-existent event }, @@ -113,7 +114,7 @@ func TestRegistry_UntrackSince(t *testing.T) { registry.AssignLSN(waiter3.ID, 12) // Call UntrackSince with threshold LSN - registry.UntrackSince(11) + registry.UntrackSince(11, fmt.Errorf("a random error")) // Waiters with LSN > 10 should be obsoleted select { @@ -123,17 +124,17 @@ func TestRegistry_UntrackSince(t *testing.T) { // Waiter1 should not be closed } select { - case <-waiter2.C: + case err := <-waiter2.C: // Expected behavior, channel closed - require.Equal(t, ErrObsoleted, waiter2.Err) + require.Equal(t, fmt.Errorf("a random error"), err) default: t.Fatalf("Expected channel for event %d to be closed", waiter2.ID) } select { - case <-waiter3.C: + case err := <-waiter3.C: // Expected behavior, channel closed - require.Equal(t, ErrObsoleted, waiter3.Err) + require.Equal(t, fmt.Errorf("a random error"), err) default: t.Fatalf("Expected channel for event %d to be closed", waiter3.ID) } @@ -145,6 +146,38 @@ func TestRegistry_UntrackSince(t *testing.T) { require.False(t, registry.Untrack(waiter3.ID)) } +func TestRegistry_UntrackAll(t *testing.T) { + t.Parallel() + registry := NewRegistry() + + waiter1 := registry.Register() + waiter2 := registry.Register() + waiter3 := registry.Register() + + // Assign LSNs + registry.AssignLSN(waiter1.ID, 10) + registry.AssignLSN(waiter2.ID, 11) + registry.AssignLSN(waiter3.ID, 12) + + // Call UntrackSince with threshold LSN + registry.UntrackAll(fmt.Errorf("a random error")) + + for _, w := range []*Waiter{waiter1, waiter2, waiter3} { + select { + case err := <-w.C: + // Expected behavior, channel closed + require.Equal(t, fmt.Errorf("a random error"), err) + default: + t.Fatalf("Expected channel for event %d to be closed", w.ID) + } + } + + // All waiters should not be tracked + require.False(t, registry.Untrack(waiter1.ID)) + require.False(t, registry.Untrack(waiter2.ID)) + require.False(t, registry.Untrack(waiter3.ID)) +} + func TestRegistry_ConcurrentAccess(t *testing.T) { t.Parallel() const numEvents = 100 diff --git a/internal/gitaly/storage/raftmgr/manager.go b/internal/gitaly/storage/raftmgr/manager.go index 64769fdb9a3..fc895c1848e 100644 --- a/internal/gitaly/storage/raftmgr/manager.go +++ b/internal/gitaly/storage/raftmgr/manager.go @@ -2,6 +2,7 @@ package raftmgr import ( "context" + "fmt" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "go.etcd.io/etcd/raft/v3/raftpb" @@ -13,3 +14,7 @@ type RaftManager interface { GetLogReader() storage.LogReader Step(ctx context.Context, msg raftpb.Message) error } + +// ErrObsoleted is returned when an event associated with a LSN is shadowed by another one with higher term. That event +// must be unlocked and removed from the registry. +var ErrObsoleted = fmt.Errorf("event is obsolete, superseded by a recent log entry with higher term") -- GitLab From 4cf4fd208c4d9a8dd190a78df48e06c55ac3d30c Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 25 Feb 2025 22:57:32 +0700 Subject: [PATCH 02/10] raft: Move common helper methods to testhelper file This commit moves some helper methods in the Storage test to the testhelper file. Those methods will be used by the manager in upcoming commits. --- .../gitaly/storage/raftmgr/storage_test.go | 46 ---------------- .../gitaly/storage/raftmgr/testhelper_test.go | 53 +++++++++++++++++++ 2 files changed, 53 insertions(+), 46 deletions(-) diff --git a/internal/gitaly/storage/raftmgr/storage_test.go b/internal/gitaly/storage/raftmgr/storage_test.go index e69db97bf50..e007facae65 100644 --- a/internal/gitaly/storage/raftmgr/storage_test.go +++ b/internal/gitaly/storage/raftmgr/storage_test.go @@ -3,18 +3,14 @@ package raftmgr import ( "context" "fmt" - "sync" "testing" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper" - lg "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -22,48 +18,6 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" ) -type mockConsumer struct { - notifications []mockNotification - mutex sync.Mutex -} - -type mockNotification struct { - storageName string - partitionID storage.PartitionID - lowWaterMark storage.LSN - highWaterMark storage.LSN -} - -func (mc *mockConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, committedLSN storage.LSN) { - mc.mutex.Lock() - defer mc.mutex.Unlock() - mc.notifications = append(mc.notifications, mockNotification{ - storageName: storageName, - partitionID: partitionID, - lowWaterMark: lowWaterMark, - highWaterMark: committedLSN, - }) -} - -func (mc *mockConsumer) GetNotifications() []mockNotification { - mc.mutex.Lock() - defer mc.mutex.Unlock() - return mc.notifications -} - -func getTestDBManager(t *testing.T, ctx context.Context, cfg config.Cfg, logger lg.Logger) keyvalue.Transactioner { - t.Helper() - - dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) - require.NoError(t, err) - t.Cleanup(dbMgr.Close) - - db, err := dbMgr.GetDB(cfg.Storages[0].Name) - require.NoError(t, err) - - return db -} - func setupStorage(t *testing.T, ctx context.Context, cfg config.Cfg) *Storage { stagingDir := testhelper.TempDir(t) stateDir := testhelper.TempDir(t) diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index c6c4a4694c3..8dd0ae417e8 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -2,9 +2,15 @@ package raftmgr import ( "context" + "sync" "testing" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" logger "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "go.etcd.io/etcd/raft/v3/raftpb" @@ -33,3 +39,50 @@ func (m *mockRaftManager) GetLogReader() storage.LogReader { func (m *mockRaftManager) Step(ctx context.Context, msg raftpb.Message) error { return nil } + +type mockConsumer struct { + notifications []mockNotification + mutex sync.Mutex +} + +type mockNotification struct { + storageName string + partitionID storage.PartitionID + lowWaterMark storage.LSN + highWaterMark storage.LSN +} + +func (mc *mockConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, committedLSN storage.LSN) { + mc.mutex.Lock() + defer mc.mutex.Unlock() + mc.notifications = append(mc.notifications, mockNotification{ + storageName: storageName, + partitionID: partitionID, + lowWaterMark: lowWaterMark, + highWaterMark: committedLSN, + }) +} + +func (mc *mockConsumer) GetNotifications() []mockNotification { + mc.mutex.Lock() + defer mc.mutex.Unlock() + return mc.notifications +} + +func openTestDB(t *testing.T, ctx context.Context, cfg config.Cfg, logger logger.Logger) *databasemgr.DBManager { + dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) + require.NoError(t, err) + return dbMgr +} + +func getTestDBManager(t *testing.T, ctx context.Context, cfg config.Cfg, logger logger.Logger) keyvalue.Transactioner { + t.Helper() + + dbMgr := openTestDB(t, ctx, cfg, logger) + t.Cleanup(dbMgr.Close) + + db, err := dbMgr.GetDB(cfg.Storages[0].Name) + require.NoError(t, err) + + return db +} -- GitLab From f7183cdcb2177061800bbadeffcbbe6728d8a76d Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Wed, 26 Feb 2025 13:10:14 +0700 Subject: [PATCH 03/10] raft: Implement Raft Manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gitaly needs a Raft Manager to handle distributed consensus in multi-node setups. This commit implements the core Raft Manager that coordinates the Raft consensus protocol within Gitaly, integrating with etcd/raft library. The Raft Manager handles several critical responsibilities: - Manages Raft node lifecycle and state persistence - Coordinates log entry replication and consensus - Handles leadership elections and changes - Processes configuration changes for cluster membership - Manages communication between nodes via transport layer Currently, the implementation focuses on single-node setups with plans for multi-node support later. The Raft Manager implements storage.LogManager interface. By default, Transaction Manager uses log.Manager. All log entries are appended to the file-system WAL. As soon as an entry is persisted, it's ready to be applied by Transaction Manager. When Raft is enabled, the whole flow is handled by raftmgr.Manager. Network transmission and quorum acknowledgment are now added to the flow. To reduce tight coupling and complexity, the flow is encapsulated inside raftmgr.Manager. Both log.Manager and raftmgr.Manager share the same interface. The responsibilities of three crucial components are: - log.Manager handles local log management - raftmgr.Manager handles distributed log management - partition.TransactionManager manages transactions, concurrency, snapshots, conflicts, etc. The flow looks like the following chart: ┌──────────┐ │Local Disk│ └────▲─────┘ │ log.Manager ▲ TransactionManager New Transaction │ Without Raft │ ▼ │ └─►Initialize─►txn.Commit()─►Verify─► AppendLogEntry()──►Apply │ │ │ │ With Raft ▼ ▼ ┌─►Initialize─────...──────────────────Propose │ │ raftmgr.Manager etcd/raft state machine ┌───────┴────────┐ ▼ ▼ raftmgr.Storage raftmgr.Transport │ │ ▼ │ log.Manager Network │ │ ┌─────▼────┐ ┌─────▼────┐ │Local Disk│ │Raft Group│ └──────────┘ └──────────┘ --- .../storage/raftmgr/grpc_transport_test.go | 4 +- internal/gitaly/storage/raftmgr/manager.go | 723 +++++++++++++++++- .../gitaly/storage/raftmgr/manager_test.go | 590 ++++++++++++++ .../gitaly/storage/raftmgr/testhelper_test.go | 1 + 4 files changed, 1311 insertions(+), 7 deletions(-) create mode 100644 internal/gitaly/storage/raftmgr/manager_test.go diff --git a/internal/gitaly/storage/raftmgr/grpc_transport_test.go b/internal/gitaly/storage/raftmgr/grpc_transport_test.go index 78892881e5d..2b2db087905 100644 --- a/internal/gitaly/storage/raftmgr/grpc_transport_test.go +++ b/internal/gitaly/storage/raftmgr/grpc_transport_test.go @@ -145,10 +145,10 @@ func TestGrpcTransport_SendAndReceive(t *testing.T) { require.NoError(t, err) // Create test messages - msgs := createTestMessages(t, testCluster, mgr.GetLogReader(), tc.walEntries) + msgs := createTestMessages(t, testCluster, mgr, tc.walEntries) // Send Message from leader to all followers - err = leader.transport.Send(ctx, mgr.GetLogReader(), 1, storageName, msgs) + err = leader.transport.Send(ctx, mgr, 1, storageName, msgs) if tc.expectedError != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.expectedError) diff --git a/internal/gitaly/storage/raftmgr/manager.go b/internal/gitaly/storage/raftmgr/manager.go index fc895c1848e..0f778692773 100644 --- a/internal/gitaly/storage/raftmgr/manager.go +++ b/internal/gitaly/storage/raftmgr/manager.go @@ -2,19 +2,732 @@ package raftmgr import ( "context" + "errors" "fmt" + "runtime" + "sync" + "time" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" + logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/safe" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + "google.golang.org/protobuf/proto" ) // RaftManager is an interface that defines the methods to orchestrate the Raft consensus protocol. type RaftManager interface { - GetEntryPath(storage.LSN) string - GetLogReader() storage.LogReader + // Embed all LogManager methods for WAL operations + storage.LogManager + + // Initialize prepares the Raft system with the given context and last applied LSN + Initialize(ctx context.Context, appliedLSN storage.LSN) error + + // Step processes a Raft message from a remote node Step(ctx context.Context, msg raftpb.Message) error } -// ErrObsoleted is returned when an event associated with a LSN is shadowed by another one with higher term. That event -// must be unlocked and removed from the registry. -var ErrObsoleted = fmt.Errorf("event is obsolete, superseded by a recent log entry with higher term") +var ( + // ErrObsoleted is returned when an event associated with a LSN is shadowed by another one with higher term. That event + // must be unlocked and removed from the registry. + ErrObsoleted = fmt.Errorf("event is obsolete, superseded by a recent log entry with higher term") + // ErrReadyTimeout is returned when the manager times out when waiting for Raft group to be ready. + ErrReadyTimeout = fmt.Errorf("ready timeout exceeded") + // ErrManagerStopped is returned when the main loop of Raft manager stops. + ErrManagerStopped = fmt.Errorf("raft manager stopped") +) + +const ( + // Maximum size of individual Raft messages + defaultMaxSizePerMsg = 10 * 1024 * 1024 + // Maximum number of in-flight Raft messages. This controls how many messages can be sent without acknowledgment + defaultMaxInflightMsgs = 256 +) + +// ready manages the readiness signaling of the Raft system. +type ready struct { + c chan error // Channel used to signal readiness + once sync.Once // Ensures the readiness signal is sent exactly once +} + +// set signals readiness by closing the channel exactly once. +func (r *ready) set(err error) { + r.once.Do(func() { + r.c <- err + close(r.c) + }) +} + +// ManagerOptions configures optional parameters for the Raft Manager. +type ManagerOptions struct { + // readyTimeout sets the maximum duration to wait for Raft to become ready + readyTimeout time.Duration + // opTimeout sets the maximum duration for propose, append, and commit operations + // This is primarily used in testing to detect deadlocks and performance issues + opTimeout time.Duration + // entryRecorder stores Raft log entries for testing purposes + entryRecorder *EntryRecorder +} + +// OptionFunc defines a function type for configuring ManagerOptions. +type OptionFunc func(opt ManagerOptions) ManagerOptions + +// WithReadyTimeout sets the maximum duration to wait for Raft to become ready. +// The default timeout is 5 times the election timeout. +func WithReadyTimeout(t time.Duration) OptionFunc { + return func(opt ManagerOptions) ManagerOptions { + opt.readyTimeout = t + return opt + } +} + +// WithOpTimeout sets a timeout for individual Raft operations. +// This should only be used in testing environments. +func WithOpTimeout(t time.Duration) OptionFunc { + return func(opt ManagerOptions) ManagerOptions { + opt.opTimeout = t + return opt + } +} + +// WithEntryRecorder enables recording of Raft log entries for testing. +func WithEntryRecorder(recorder *EntryRecorder) OptionFunc { + return func(opt ManagerOptions) ManagerOptions { + opt.entryRecorder = recorder + return opt + } +} + +// Manager orchestrates the Raft consensus protocol for a Gitaly partition. +// It manages configuration, state synchronization, and communication between members. +// The Manager implements the storage.LogManager interface. +type Manager struct { + mutex sync.Mutex + + ctx context.Context // Context for controlling manager's lifecycle + cancel context.CancelFunc + + authorityName string // Name of the storage this partition belongs to + ptnID storage.PartitionID // Unique identifier for the managed partition + node raft.Node // etcd/raft node representation + raftCfg config.Raft // etcd/raft configurations + options ManagerOptions // Additional manager configuration + logger logging.Logger // Internal logging + storage *Storage // Persistent storage for Raft logs and state + registry *Registry // Event tracking + leadership *Leadership // Current leadership information + syncer safe.Syncer // Synchronization operations + wg sync.WaitGroup // Goroutine lifecycle management + ready *ready // Initialization state tracking + started bool // Indicates if manager has been started + + // notifyQueue signals new changes or errors to clients + // Clients must process signals promptly to prevent blocking + notifyQueue chan error + + // EntryRecorder stores Raft log entries for testing + EntryRecorder *EntryRecorder +} + +// applyOptions creates and validates manager options by applying provided option functions +// to a default configuration. +func applyOptions(raftCfg config.Raft, opts []OptionFunc) (ManagerOptions, error) { + baseRTT := time.Duration(raftCfg.RTTMilliseconds) * time.Millisecond + options := ManagerOptions{ + // Default readyTimeout is 5 times the election timeout to allow for initial self-elections + readyTimeout: 5 * time.Duration(raftCfg.ElectionTicks) * baseRTT, + } + + for _, opt := range opts { + options = opt(options) + } + + if options.readyTimeout == 0 { + return options, fmt.Errorf("readyTimeout must not be zero") + } else if options.readyTimeout < time.Duration(raftCfg.ElectionTicks)*baseRTT { + return options, fmt.Errorf("readyTimeout must not be less than election timeout") + } + + return options, nil +} + +// NewManager creates a new Raft Manager instance. +func NewManager( + authorityName string, + partitionID storage.PartitionID, + raftCfg config.Raft, + raftStorage *Storage, + logger logging.Logger, + opts ...OptionFunc, +) (*Manager, error) { + if !raftCfg.Enabled { + return nil, fmt.Errorf("raft is not enabled") + } + + options, err := applyOptions(raftCfg, opts) + if err != nil { + return nil, fmt.Errorf("invalid raft manager option: %w", err) + } + + logger = logger.WithFields(logging.Fields{ + "component": "raft", + "raft.authority": authorityName, + "raft.partition": partitionID, + }) + + return &Manager{ + authorityName: authorityName, + ptnID: partitionID, + raftCfg: raftCfg, + options: options, + storage: raftStorage, + logger: logger, + registry: NewRegistry(), + syncer: safe.NewSyncer(), + leadership: NewLeadership(), + ready: &ready{c: make(chan error, 1)}, + notifyQueue: make(chan error, 1), + EntryRecorder: options.entryRecorder, + }, nil +} + +// Initialize starts the Raft manager by: +// - Loading or bootstrapping the Raft state +// - Initializing the etcd/raft Node +// - Starting the processing goroutine +// +// The appliedLSN parameter indicates the last log sequence number that was fully applied +// to the partition's state. This ensures that Raft processing begins from the correct point +// in the log history. +func (mgr *Manager) Initialize(ctx context.Context, appliedLSN storage.LSN) error { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + if mgr.started { + return fmt.Errorf("raft manager for partition %q already started", mgr.ptnID) + } + mgr.started = true + + mgr.ctx, mgr.cancel = context.WithCancel(ctx) + + bootstrapped, err := mgr.storage.initialize(ctx, appliedLSN) + if err != nil { + return fmt.Errorf("failed to load raft initial state: %w", err) + } + + // etcd/raft uses an integer ID to identify a member of a group. This ID is incremented whenever a new member + // joins the group. This node ID system yields some benefits: + // - No need to set the node ID statically, avoiding the need for a composite key of the storage + // name and node ID. + // - No need for a global node registration system, as IDs are ephemeral. + // - Works better scenarios where a member leaves and then re-join the cluster. Each joining event leads to + // a new unique node ID. + // Currently, Gitaly only supports single-node Raft clusters, so we use a fixed node ID of 1. In a future + // implementation of multi-node clusters, each node will get a unique ID when joining the cluster. + // https://gitlab.com/gitlab-org/gitaly/-/issues/6304 tracks the work to bootstrap new cluster members. + var nodeID uint64 = 1 + + config := &raft.Config{ + ID: nodeID, + ElectionTick: int(mgr.raftCfg.ElectionTicks), + HeartbeatTick: int(mgr.raftCfg.HeartbeatTicks), + Storage: mgr.storage, + MaxSizePerMsg: defaultMaxSizePerMsg, + MaxInflightMsgs: defaultMaxInflightMsgs, + Logger: &raftLogger{logger: mgr.logger.WithFields(logging.Fields{"raft.component": "manager"})}, + // We disable automatic proposal forwarding provided by etcd/raft because it would bypass Gitaly's + // transaction validation system. In Gitaly, each transaction is verified against the latest state + // before being committed. If proposal forwarding is enabled, replica nodes would have the ability to + // start transactions independently and propose them to the leader for commit. + // + // Replica: A -> B -> C -> Start D ------------> Forward to Leader ----| + // Leader: A -> B -> C -> Start E -> Commit E ----------------------> Receive D -> Commit D + // In this scenario, D is not verified against E even though E commits before D. + // + // Instead, we'll implement explicit request routing at the RPC layer to ensure all writes go through + // proper verification on the leader. + // See https://gitlab.com/gitlab-org/gitaly/-/issues/6465 + DisableProposalForwarding: true, + } + + if !bootstrapped { + // For first-time bootstrap, initialize with self as the only peer + peers := []raft.Peer{{ID: nodeID}} + mgr.node = raft.StartNode(config, peers) + } else { + // For restarts, set Applied to latest committed LSN + // WAL considers entries committed once they are in the Raft log + config.Applied = uint64(mgr.storage.committedLSN) + mgr.node = raft.RestartNode(config) + } + + go mgr.run(bootstrapped) + + select { + case <-time.After(mgr.options.readyTimeout): + return ErrReadyTimeout + case err := <-mgr.ready.c: + return err + } +} + +// run executes the main Raft event loop, processing ticks, ready states, and notifications. +func (mgr *Manager) run(bootstrapped bool) { + mgr.wg.Add(1) + defer mgr.wg.Done() + + ticker := time.NewTicker(time.Duration(mgr.raftCfg.RTTMilliseconds) * time.Millisecond) + defer ticker.Stop() + + // For bootstrapped clusters, mark ready immediately since state is already established + // For new clusters, wait for first config change + if bootstrapped { + mgr.signalReady() + } + + // Main event processing loop + for { + select { + case <-ticker.C: + // Drive the etcd/raft internal clock + // Election and timeout depend on tick count + mgr.node.Tick() + case rd, ok := <-mgr.node.Ready(): + if err := mgr.safeExec(func() error { + if !ok { + return fmt.Errorf("raft node Ready channel unexpectedly closed") + } + if err := mgr.handleReady(&rd); err != nil { + return err + } + mgr.node.Advance() + return nil + }); err != nil { + mgr.handleFatalError(err) + return + } + case err := <-mgr.storage.localLog.GetNotificationQueue(): + // Forward storage notifications + if err == nil { + select { + case mgr.notifyQueue <- nil: + default: + // Non-critical if we can't send a nil notification + } + } else { + mgr.handleFatalError(err) + return + } + + case <-mgr.ctx.Done(): + err := mgr.ctx.Err() + if !errors.Is(err, context.Canceled) { + mgr.handleFatalError(err) + } + return + } + } +} + +// safeExec executes a function and recovers from panics, converting them to errors +func (mgr *Manager) safeExec(fn func() error) (err error) { + defer func() { + if r := recover(); r != nil { + switch v := r.(type) { + case error: + err = fmt.Errorf("panic recovered: %w", v) + default: + err = fmt.Errorf("panic recovered: %v", r) + } + // Capture stack trace for debugging + stack := make([]byte, 4096) + stack = stack[:runtime.Stack(stack, false)] + + err := fmt.Errorf("raft manager panic: %v", r) + mgr.logger.WithError(err).WithField("error.stack", string(stack)).Error("raft manager panic recovered") + } + }() + + return fn() +} + +// handleFatalError handles a fatal error that requires the run loop to terminate +func (mgr *Manager) handleFatalError(err error) { + // Set back to ready to unlock the caller of Initialize(). + mgr.signalError(ErrManagerStopped) + + mgr.logger.WithError(err).Error("raft event loop failed") + + // Unlock all waiters of AppendLogEntry about the manager being stopped. + mgr.registry.UntrackAll(ErrManagerStopped) + + // Ensure error is sent to notification queue. + mgr.notifyQueue <- err +} + +// Close gracefully shuts down the Raft manager and its components. +func (mgr *Manager) Close() error { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + if !mgr.started { + return nil + } + + mgr.node.Stop() + mgr.cancel() + mgr.wg.Wait() + + return mgr.storage.close() +} + +// GetNotificationQueue returns the channel used to notify external components of changes. +func (mgr *Manager) GetNotificationQueue() <-chan error { + return mgr.notifyQueue +} + +// GetEntryPath returns the filesystem path for a given log entry. +func (mgr *Manager) GetEntryPath(lsn storage.LSN) string { + return mgr.storage.localLog.GetEntryPath(lsn) +} + +// AcknowledgePosition marks log entries up to and including the given LSN +// as successfully processed for the specified position type. Raft manager +// doesn't handle this directly. It propagates to the local log manager. +func (mgr *Manager) AcknowledgePosition(t storage.PositionType, lsn storage.LSN) error { + return mgr.storage.localLog.AcknowledgePosition(t, lsn) +} + +// AppendedLSN returns the LSN of the most recently appended log entry. +func (mgr *Manager) AppendedLSN() storage.LSN { + return mgr.storage.committedLSN +} + +// LowWaterMark returns the earliest LSN that should be retained. +// Log entries before this LSN can be safely removed. +func (mgr *Manager) LowWaterMark() storage.LSN { + lsn, _ := mgr.storage.FirstIndex() + return storage.LSN(lsn) +} + +// AppendLogEntry proposes a new log entry to the cluster. +// It blocks until the entry is committed, timeout occurs, or the cluster rejects it. +func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { + mgr.wg.Add(1) + defer mgr.wg.Done() + + w := mgr.registry.Register() + defer mgr.registry.Untrack(w.ID) + + message := &gitalypb.RaftEntry{ + Id: uint64(w.ID), + Data: &gitalypb.RaftEntry_LogData{ + LocalPath: []byte(logEntryPath), + }, + } + data, err := proto.Marshal(message) + if err != nil { + return 0, fmt.Errorf("marshaling Raft message: %w", err) + } + + ctx := mgr.ctx + + // Set an optional timeout to prevent proposal processing takes forever. This option is + // more useful in testing environments. + if mgr.options.opTimeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(mgr.ctx, mgr.options.opTimeout) + defer cancel() + } + + if err := mgr.node.Propose(ctx, data); err != nil { + return 0, fmt.Errorf("proposing Raft message: %w", err) + } + + select { + case <-ctx.Done(): + return 0, ctx.Err() + case err := <-w.C: + return w.LSN, err + } +} + +// CompareAndAppendLogEntry is a variant of AppendLogEntry. It appends the log entry to the write-ahead log if and only +// if the inserting position matches the expected LSN. Raft manager doesn't implement this method. The LSN is allocated +// by the underlying Raft engine. It cannot guarantee the inserted LSN matches before actual insertion. +func (mgr *Manager) CompareAndAppendLogEntry(lsn storage.LSN, logEntryPath string) (storage.LSN, error) { + return 0, fmt.Errorf("raft manager does not support CompareAndAppendLogEntry") +} + +// DeleteLogEntry deletes the log entry at the given LSN from the log. Raft manager doesn't support log entry deletion. +// After an entry is persisted, it is then sent to other members in the raft group for acknowledgement. There is no good +// way to withdraw this submission. +func (mgr *Manager) DeleteLogEntry(lsn storage.LSN) error { + return fmt.Errorf("raft manager does not support DeleteLogEntry") +} + +// NotifyNewEntries signals to the notification queue that a newly written log entry is available for consumption. +func (mgr *Manager) NotifyNewEntries() { + mgr.notifyQueue <- nil +} + +// handleReady processes the next state signaled by etcd/raft through three main steps: +// 1. Persist states (SoftState, HardState, and uncommitted Entries) +// 2. Send messages to other members via Transport +// 3. Process committed entries (entries acknowledged by the majority) +// See: https://pkg.go.dev/go.etcd.io/etcd/raft/v3#section-readme +func (mgr *Manager) handleReady(rd *raft.Ready) error { + // Handle volatile state updates for leadership tracking and observability + if err := mgr.handleSoftState(rd); err != nil { + return fmt.Errorf("handling soft state: %w", err) + } + + // In https://pkg.go.dev/go.etcd.io/etcd/raft/v3#section-readme, the + // library recommends saving entries, hard state, and snapshots + // atomically. We can't currently do this because we use different + // backends to persist these items: + // - entries are appended to WAL + // - hard state is persisted to the KV DB + // - snapshots are written to a directory + + // Persist new log entries to disk. These entries are not yet committed + // and may be superseded by entries with the same LSN but higher term. + // WAL will clean up any overlapping entries. + if err := mgr.saveEntries(rd); err != nil { + return fmt.Errorf("saving entries: %w", err) + } + + // Persist essential state needed for crash recovery + if err := mgr.handleHardState(rd); err != nil { + return fmt.Errorf("handling hard state: %w", err) + } + + // Send messages to other members in the cluster. + // Note: While the Raft thesis (section 10.2) suggests pipelining this step + // for parallel processing after disk persistence, our WAL currently serializes + // transactions. This optimization may become relevant when WAL supports + // concurrent transaction processing. + // Reference: https://github.com/ongardie/dissertation/blob/master/stanford.pdf + // + // The current implementation does not include Raft snapshotting or log compaction. + // This means the log will grow indefinitely until manually truncated. + // + // In a future implementation, periodic snapshots will allow the log to be trimmed + // by removing entries that have been incorporated into a snapshot. + // See https://gitlab.com/gitlab-org/gitaly/-/issues/6463 + if err := mgr.sendMessages(rd); err != nil { + return fmt.Errorf("sending messages: %w", err) + } + + // Process committed entries in WAL. In single-node clusters, entries will be + // committed immediately without network communication since there's no need for + // consensus with other members. + if err := mgr.processCommitEntries(rd); err != nil { + return fmt.Errorf("processing committed entries: %w", err) + } + return nil +} + +// saveEntries persists new log entries to storage and handles their recording if enabled. +func (mgr *Manager) saveEntries(rd *raft.Ready) error { + if len(rd.Entries) == 0 { + return nil + } + + // Remove in-flight events with duplicate LSNs but lower terms + // WAL will clean up corresponding entries on disk + // Events without LSNs are preserved as they haven't reached this stage + firstLSN := storage.LSN(rd.Entries[0].Index) + mgr.registry.UntrackSince(firstLSN, ErrObsoleted) + + for i := range rd.Entries { + lsn := storage.LSN(rd.Entries[i].Index) + + switch rd.Entries[i].Type { + case raftpb.EntryNormal: + if len(rd.Entries[i].Data) == 0 { + // Handle empty entries (typically internal Raft entries) + if err := mgr.storage.draftLogEntry(rd.Entries[i], func(w *wal.Entry) error { + return nil + }); err != nil { + return fmt.Errorf("inserting config change log entry: %w", err) + } + if err := mgr.recordEntryIfNeeded(true, lsn); err != nil { + return fmt.Errorf("recording log entry: %w", err) + } + } else { + // Handle normal entries containing RaftMessage data + var message gitalypb.RaftEntry + if err := proto.Unmarshal(rd.Entries[i].Data, &message); err != nil { + return fmt.Errorf("unmarshalling entry type: %w", err) + } + + logEntryPath := string(message.GetData().GetLocalPath()) + if err := mgr.storage.insertLogEntry(rd.Entries[i], logEntryPath); err != nil { + return fmt.Errorf("appending log entry: %w", err) + } + if err := mgr.recordEntryIfNeeded(false, lsn); err != nil { + return fmt.Errorf("recording log entry: %w", err) + } + + mgr.registry.AssignLSN(EventID(message.GetId()), lsn) + } + case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: + // Handle configuration change entries + if err := mgr.storage.draftLogEntry(rd.Entries[i], func(w *wal.Entry) error { + marshaledValue, err := proto.Marshal(lsn.ToProto()) + if err != nil { + return fmt.Errorf("marshal value: %w", err) + } + w.SetKey(KeyLastConfigChange, marshaledValue) + return nil + }); err != nil { + return fmt.Errorf("inserting config change log entry: %w", err) + } + if err := mgr.recordEntryIfNeeded(true, lsn); err != nil { + return fmt.Errorf("recording log entry: %w", err) + } + default: + return fmt.Errorf("raft entry type not supported: %s", rd.Entries[i].Type) + } + } + return nil +} + +// processCommitEntries processes entries that have been committed by the Raft consensus +// and updates the system state accordingly. +func (mgr *Manager) processCommitEntries(rd *raft.Ready) error { + for i := range rd.CommittedEntries { + var shouldNotify bool + + switch rd.CommittedEntries[i].Type { + case raftpb.EntryNormal: + var message gitalypb.RaftEntry + if err := proto.Unmarshal(rd.CommittedEntries[i].Data, &message); err != nil { + return fmt.Errorf("unmarshalling entry type: %w", err) + } + + // Notification logic: + // 1. For internal entries (those NOT tracked in the registry), we notify because + // the caller isn't aware of these automatically generated entries + // 2. For caller-issued entries (those tracked in the registry), we don't notify + // since the caller already knows about these entries + // The Untrack() method returns true for tracked entries and the unlocks waiting channel in + // AppendLogEntry(). Callers must handle concurrent modifications appropriately + shouldNotify = !mgr.registry.Untrack(EventID(message.GetId())) + + case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: + if err := mgr.processConfChange(rd.CommittedEntries[i]); err != nil { + return fmt.Errorf("processing config change: %w", err) + } + shouldNotify = true + + default: + return fmt.Errorf("raft entry type not supported: %s", rd.CommittedEntries[i].Type) + } + + if shouldNotify { + select { + case mgr.notifyQueue <- nil: + default: + } + } + } + return nil +} + +// processConfChange processes committed config change change entries, +func (mgr *Manager) processConfChange(entry raftpb.Entry) error { + var cc raftpb.ConfChangeI + if entry.Type == raftpb.EntryConfChange { + var cc1 raftpb.ConfChange + if err := cc1.Unmarshal(entry.Data); err != nil { + return fmt.Errorf("unmarshalling EntryConfChange: %w", err) + } + cc = cc1 + } else { + var cc2 raftpb.ConfChangeV2 + if err := cc2.Unmarshal(entry.Data); err != nil { + return fmt.Errorf("unmarshalling EntryConfChangeV2: %w", err) + } + cc = cc2 + } + + confState := mgr.node.ApplyConfChange(cc) + if err := mgr.storage.saveConfState(*confState); err != nil { + return fmt.Errorf("saving config state: %w", err) + } + + // Signal readiness after first config change. Applies only to new clusters that have not been bootstrapped. Not + // needed for subsequent restarts + mgr.signalReady() + return nil +} + +// sendMessages delivers pending Raft messages to other members via the transport layer. +func (mgr *Manager) sendMessages(rd *raft.Ready) error { + if len(rd.Messages) > 0 { + // This code path will be properly implemented when network communication is added + // See https://gitlab.com/gitlab-org/gitaly/-/issues/6304 + return fmt.Errorf("networking for raft cluster is not implemented yet") + } + return nil +} + +// handleSoftState processes changes to volatile state like leadership and logs significant changes. +func (mgr *Manager) handleSoftState(rd *raft.Ready) error { + state := rd.SoftState + if state == nil { + return nil + } + prevLeader := mgr.leadership.GetLeaderID() + changed, duration := mgr.leadership.SetLeader(state.Lead, state.RaftState == raft.StateLeader) + + if changed { + mgr.logger.WithFields(logging.Fields{ + "raft.leader_id": mgr.leadership.GetLeaderID(), + "raft.is_leader": mgr.leadership.IsLeader(), + "raft.previous_leader_id": prevLeader, + "raft.leadership_duration": duration, + }).Info("leadership updated") + } + return nil +} + +// handleHardState persists critical Raft state required for crash recovery. +func (mgr *Manager) handleHardState(rd *raft.Ready) error { + if raft.IsEmptyHardState(rd.HardState) { + return nil + } + if err := mgr.storage.saveHardState(rd.HardState); err != nil { + return fmt.Errorf("saving hard state: %w", err) + } + return nil +} + +// recordEntryIfNeeded records log entries when entry recording is enabled, +// typically used for testing and debugging. +func (mgr *Manager) recordEntryIfNeeded(fromRaft bool, lsn storage.LSN) error { + if mgr.EntryRecorder != nil { + logEntry, err := mgr.storage.readLogEntry(lsn) + if err != nil { + return fmt.Errorf("reading log entry: %w", err) + } + mgr.EntryRecorder.Record(fromRaft, lsn, logEntry) + } + return nil +} + +func (mgr *Manager) signalReady() { + mgr.ready.set(nil) +} + +func (mgr *Manager) signalError(err error) { + mgr.ready.set(err) +} + +var _ = (storage.LogManager)(&Manager{}) diff --git a/internal/gitaly/storage/raftmgr/manager_test.go b/internal/gitaly/storage/raftmgr/manager_test.go new file mode 100644 index 00000000000..9d8e0e29f8d --- /dev/null +++ b/internal/gitaly/storage/raftmgr/manager_test.go @@ -0,0 +1,590 @@ +package raftmgr + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +func raftConfigsForTest(t *testing.T) config.Raft { + // Speed up initial election overhead in the test setup + return config.Raft{ + Enabled: true, + ClusterID: "test-cluster", + ElectionTicks: 5, + HeartbeatTicks: 2, + RTTMilliseconds: 100, + SnapshotDir: testhelper.TempDir(t), + } +} + +func TestManager_Initialize(t *testing.T) { + t.Parallel() + + t.Run("succeeds when raft is enabled", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + // Verify that the manager is properly initialized + require.True(t, mgr.started) + require.NotNil(t, mgr.node) + + // Verify that the first config change is recorded + // After initialization, Raft typically creates a config change + // entry to establish the initial configuration + require.Eventually(t, func() bool { + return recorder.Len() > 0 + }, 5*time.Second, 10*time.Millisecond, "expected at least one entry to be recorded") + + // Verify at least one entry from Raft was recorded (typically a config change) + raftEntries := recorder.FromRaft() + require.NotEmpty(t, raftEntries, "expected at least one Raft entry after initialization") + + // Close the manager + require.NoError(t, mgr.Close()) + }) + + t.Run("fails when raft is not enabled", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + raftCfg.Enabled = false + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + // Configure manager with Raft disabled + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.Nil(t, mgr) + require.ErrorContains(t, err, "raft is not enabled") + }) + + t.Run("fails when manager is reused", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.NoError(t, err) + + // First initialization should succeed + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + // Second initialization should fail + err = mgr.Initialize(ctx, 0) + require.EqualError(t, err, fmt.Sprintf("raft manager for partition %q already started", partitionID)) + + require.NoError(t, mgr.Close()) + }) +} + +func TestManager_AppendLogEntry(t *testing.T) { + t.Parallel() + + setup := func(t *testing.T, ctx context.Context, cfg config.Cfg) (*Manager, *EntryRecorder) { + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + return mgr, recorder + } + + createLogEntry := func(t *testing.T, ctx context.Context, content string) string { + entryDir := testhelper.TempDir(t) + entry := wal.NewEntry(entryDir) + entry.SetKey([]byte("test-key"), []byte(content)) + + // Create a few files in the directory to simulate actual log entry data + for i := 1; i <= 3; i++ { + filePath := filepath.Join(entryDir, fmt.Sprintf("file-%d", i)) + require.NoError(t, os.WriteFile(filePath, []byte(content), 0o644)) + } + + // Write the manifest + require.NoError(t, wal.WriteManifest(ctx, entryDir, &gitalypb.LogEntry{ + Operations: entry.Operations(), + })) + + return entryDir + } + + t.Run("append single log entry", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + mgr, recorder := setup(t, ctx, cfg) + defer func() { + require.NoError(t, mgr.Close()) + }() + + logEntryPath := createLogEntry(t, ctx, "test-content-1") + lsn, err := mgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + require.Greater(t, lsn, storage.LSN(0), "expected a valid LSN") + + // Verify that the log entry was recorded + require.Eventually(t, func() bool { + // The entry might be recorded with an offset due to Raft internal entries + return recorder.Len() >= 3 + }, 5*time.Second, 10*time.Millisecond, "expected log entry to be recorded") + + // Check that our entry is not marked as coming from Raft + require.False(t, recorder.IsFromRaft(lsn), "expected user-submitted entry not to be from Raft") + + // Verify entry content + logEntry, err := wal.ReadManifest(mgr.GetEntryPath(lsn)) + require.NoError(t, err) + require.NotNil(t, logEntry) + require.Len(t, logEntry.GetOperations(), 1) + require.Equal(t, []byte("test-key"), logEntry.GetOperations()[0].GetSetKey().GetKey()) + require.Equal(t, []byte("test-content-1"), logEntry.GetOperations()[0].GetSetKey().GetValue()) + }) + + t.Run("append multiple log entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + mgr, recorder := setup(t, ctx, cfg) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Create and append multiple log entries + var lsns []storage.LSN + + for i := 1; i <= 3; i++ { + logEntryPath := createLogEntry(t, ctx, fmt.Sprintf("test-content-%d", i)) + lsn, err := mgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + lsns = append(lsns, lsn) + } + + // Verify that all log entries were recorded + require.Eventually(t, func() bool { + return recorder.Len() >= 3 + }, 5*time.Second, 10*time.Millisecond, "expected all log entries to be recorded") + + // Verify entries are in order + require.IsIncreasing(t, lsns, "expected increasing LSNs") + + for i := 0; i < 3; i++ { + logEntry, err := wal.ReadManifest(mgr.GetEntryPath(lsns[i])) + require.NoError(t, err) + require.NotNil(t, logEntry) + require.Len(t, logEntry.GetOperations(), 1) + require.Equal(t, []byte("test-key"), logEntry.GetOperations()[0].GetSetKey().GetKey()) + require.Equal(t, []byte(fmt.Sprintf("test-content-%d", i+1)), logEntry.GetOperations()[0].GetSetKey().GetValue()) + } + }) + + t.Run("append multiple log entries concurrently", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + mgr, _ := setup(t, ctx, cfg) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Number of concurrent entries to append. We use a gate so that all log entries are appended at the + // same time. We need to verify if the event loop works correctly when handling multiple entries in the + // same batch. However, the order is non-deterministic. There's also no guarantee (although very likely) + // a batch contains more than one entry. + numEntries := 20 + var wg sync.WaitGroup + wg.Add(numEntries) + + // Create a starting gate to coordinate concurrent execution + startGate := make(chan struct{}) + + // Store results + results := make(chan struct { + lsn storage.LSN + err error + idx int + }, numEntries) + + // Launch goroutines to append entries concurrently + for i := 1; i <= numEntries; i++ { + go func(idx int) { + defer wg.Done() + + // Wait for the starting gate + <-startGate + + // Create and append log entry + logEntryPath := createLogEntry(t, ctx, fmt.Sprintf("test-content-%d", idx)) + lsn, err := mgr.AppendLogEntry(logEntryPath) + + // Send the result back + results <- struct { + lsn storage.LSN + err error + idx int + }{lsn, err, idx} + }(i) + } + + // Start all goroutines at once + close(startGate) + + // Collect all results + var lsns []storage.LSN + lsnMap := make(map[int]storage.LSN) // Maps index to LSN for content verification + + wg.Wait() + close(results) + + for res := range results { + require.NoError(t, res.err, "AppendLogEntry should not fail for entry %d", res.idx) + require.Greater(t, res.lsn, storage.LSN(0), "should return a valid LSN for entry %d", res.idx) + lsns = append(lsns, res.lsn) + lsnMap[res.idx] = res.lsn + } + + // Verify entries are ordered when sorted + sortedLSNs := make([]storage.LSN, len(lsns)) + copy(sortedLSNs, lsns) + sort.Slice(sortedLSNs, func(i, j int) bool { + return sortedLSNs[i] < sortedLSNs[j] + }) + + require.IsIncreasing(t, sortedLSNs, "LSNs should be unique and increasing") + + // Verify each entry's content matches its index + for idx, lsn := range lsnMap { + logEntry, err := wal.ReadManifest(mgr.GetEntryPath(lsn)) + require.NoError(t, err) + require.NotNil(t, logEntry) + require.Len(t, logEntry.GetOperations(), 1) + require.Equal(t, []byte("test-key"), logEntry.GetOperations()[0].GetSetKey().GetKey()) + require.Equal(t, []byte(fmt.Sprintf("test-content-%d", idx)), logEntry.GetOperations()[0].GetSetKey().GetValue()) + } + }) + + t.Run("operation timeout", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + // Create manager with very short operation timeout + mgr, err := NewManager( + storageName, + partitionID, + raftCfg, + raftStorage, + logger, + WithEntryRecorder(recorder), + WithOpTimeout(1*time.Nanosecond), // Set a very short timeout to force failure + ) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Attempting to append should time out + logEntryPath := createLogEntry(t, ctx, "timeout-test") + _, err = mgr.AppendLogEntry(logEntryPath) + require.Error(t, err) + require.Contains(t, err.Error(), "context deadline exceeded") + }) + + t.Run("context canceled", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + cancelCtx, cancel := context.WithCancel(testhelper.Context(t)) + mgr, _ := setup(t, cancelCtx, cfg) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Cancel the context before append + cancel() + + // Attempt to append should fail with context canceled + logEntryPath := createLogEntry(t, ctx, "cancel-test") + _, err := mgr.AppendLogEntry(logEntryPath) + require.Error(t, err) + require.Contains(t, err.Error(), "context canceled") + }) +} + +func TestManager_Close(t *testing.T) { + t.Parallel() + + t.Run("close initialized manager", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := config.Raft{ + Enabled: true, + RTTMilliseconds: 100, + ElectionTicks: 10, + HeartbeatTicks: 1, + SnapshotDir: testhelper.TempDir(t), + } + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + // Close the manager + err = mgr.Close() + require.NoError(t, err, "expected Close to succeed") + + // Second close should still work (idempotent) + err = mgr.Close() + require.NoError(t, err, "expected second Close to succeed") + }) + + t.Run("close uninitialized manager", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := config.Raft{ + Enabled: true, + RTTMilliseconds: 100, + ElectionTicks: 10, + HeartbeatTicks: 1, + SnapshotDir: testhelper.TempDir(t), + } + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.NoError(t, err) + + // Close without initializing + err = mgr.Close() + require.NoError(t, err, "expected Close to succeed even without initialization") + }) + + t.Run("verify raft internal entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Get entries generated by Raft internal processes + raftEntries := recorder.FromRaft() + require.NotEmpty(t, raftEntries, "expected some internal entries generated by Raft") + + // Verify that at least one entry is a config change (usually the first one) + foundConfigChange := false + for _, entry := range raftEntries { + // Look for entries that might be related to configuration + for _, op := range entry.GetOperations() { + if op.GetSetKey() != nil && string(op.GetSetKey().GetKey()) == string(KeyLastConfigChange) { + foundConfigChange = true + break + } + } + if foundConfigChange { + break + } + } + require.True(t, foundConfigChange, "expected to find at least one config change entry") + }) +} + +func TestManager_NotImplementedLogMethods(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + // Configure manager with Raft enabled + raftCfg := config.Raft{ + Enabled: true, + RTTMilliseconds: 100, + ElectionTicks: 10, + HeartbeatTicks: 1, + SnapshotDir: testhelper.TempDir(t), + } + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + require.NoError(t, err) + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + defer func() { + require.NoError(t, mgr.Close()) + }() + + // Test CompareAndAppendLogEntry - should not be implemented + _, err = mgr.CompareAndAppendLogEntry(1, "/path/to/log") + require.ErrorContains(t, err, "raft manager does not support CompareAndAppendLogEntry") + + // Test DeleteLogEntry - should not be implemented + err = mgr.DeleteLogEntry(1) + require.ErrorContains(t, err, "raft manager does not support DeleteLogEntry") +} diff --git a/internal/gitaly/storage/raftmgr/testhelper_test.go b/internal/gitaly/storage/raftmgr/testhelper_test.go index 8dd0ae417e8..6c24a024511 100644 --- a/internal/gitaly/storage/raftmgr/testhelper_test.go +++ b/internal/gitaly/storage/raftmgr/testhelper_test.go @@ -21,6 +21,7 @@ func TestMain(m *testing.M) { } type mockRaftManager struct { + RaftManager logger logger.LogrusLogger transport Transport logManager storage.LogManager -- GitLab From 90e1dc1596a51b2d485b9f75915b7c7a72363810 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Wed, 26 Feb 2025 13:25:03 +0700 Subject: [PATCH 04/10] raft: Add crash and recovery test scenarios The previous commit covers a long list of testing scenarios for Rat manager. They are considered to be happy-path. To increase the confidence of the new Raft manager, it must be asserted under unhappy paths, especially the ability to recover correctly after a crash. Unfortunately, etcd/raft doesn't provide any intercepting points for us to inject chaos events. Thus, this commit adds some test hooks to the implementation of raftmgr.Manager and raftmgr.Storage. They help simulate different points of failures during the life cycle of an operation handled by etcd/raft engine. This commit also adds a notable amount of tests using aforementioned hooks. The current implementation solely focuses on single-node clusters. In this setting, some crashing scenarios yield the same result. For example, if the crash occurs after inserting a log entry into WAL, etcd/raft considers that log entry as committed after waking up. It is because there's only one member, hence there shouldn't be any conflict. In a more realistic setting involving multiple nodes, the result is vastly different. That log entry is sure to be discarded by the cluster. Networking is also not taken into account. Crashing during transporting messages externally leads to some interesting results. It will be covered in later iterations after integrating the networking stack. --- internal/gitaly/storage/raftmgr/hooks.go | 38 ++ internal/gitaly/storage/raftmgr/manager.go | 11 + .../gitaly/storage/raftmgr/manager_test.go | 607 ++++++++++++++++++ internal/gitaly/storage/raftmgr/storage.go | 8 + 4 files changed, 664 insertions(+) create mode 100644 internal/gitaly/storage/raftmgr/hooks.go diff --git a/internal/gitaly/storage/raftmgr/hooks.go b/internal/gitaly/storage/raftmgr/hooks.go new file mode 100644 index 00000000000..ec4ba9b7a9f --- /dev/null +++ b/internal/gitaly/storage/raftmgr/hooks.go @@ -0,0 +1,38 @@ +package raftmgr + +// testHooks defines insertion points for testing various stages of Raft operations. +type testHooks struct { + // BeforeInsertLogEntry is called before inserting a log entry at the specified index. + BeforeInsertLogEntry func(index uint64) + + // BeforeSaveHardState is called before persisting a new hard state. + BeforeSaveHardState func() + + // BeforePropose is called before proposing a new log entry with the given path. + BeforePropose func(path string) + + // BeforeProcessCommittedEntries is called before processing committed entries from a Ready state. + BeforeProcessCommittedEntries func() + + // BeforeNodeAdvance is called before advancing the Raft node's state. + BeforeNodeAdvance func() + + // BeforeSendMessages is called before sending messages to other Raft nodes. + BeforeSendMessages func() + + // BeforeHandleReady is called before processing a new Ready state from the Raft node. + BeforeHandleReady func() +} + +// noopHooks returns a Hooks instance with all hooks set to no-op functions. +func noopHooks() testHooks { + return testHooks{ + BeforeInsertLogEntry: func(uint64) {}, + BeforeSaveHardState: func() {}, + BeforePropose: func(string) {}, + BeforeProcessCommittedEntries: func() {}, + BeforeNodeAdvance: func() {}, + BeforeSendMessages: func() {}, + BeforeHandleReady: func() {}, + } +} diff --git a/internal/gitaly/storage/raftmgr/manager.go b/internal/gitaly/storage/raftmgr/manager.go index 0f778692773..d793e560178 100644 --- a/internal/gitaly/storage/raftmgr/manager.go +++ b/internal/gitaly/storage/raftmgr/manager.go @@ -131,6 +131,9 @@ type Manager struct { // EntryRecorder stores Raft log entries for testing EntryRecorder *EntryRecorder + + // hooks is a collection of hooks, used in test environment to intercept critical events + hooks testHooks } // applyOptions creates and validates manager options by applying provided option functions @@ -192,6 +195,7 @@ func NewManager( ready: &ready{c: make(chan error, 1)}, notifyQueue: make(chan error, 1), EntryRecorder: options.entryRecorder, + hooks: noopHooks(), }, nil } @@ -304,6 +308,7 @@ func (mgr *Manager) run(bootstrapped bool) { if err := mgr.handleReady(&rd); err != nil { return err } + mgr.hooks.BeforeNodeAdvance() mgr.node.Advance() return nil }); err != nil { @@ -444,6 +449,7 @@ func (mgr *Manager) AppendLogEntry(logEntryPath string) (storage.LSN, error) { defer cancel() } + mgr.hooks.BeforePropose(logEntryPath) if err := mgr.node.Propose(ctx, data); err != nil { return 0, fmt.Errorf("proposing Raft message: %w", err) } @@ -481,6 +487,8 @@ func (mgr *Manager) NotifyNewEntries() { // 3. Process committed entries (entries acknowledged by the majority) // See: https://pkg.go.dev/go.etcd.io/etcd/raft/v3#section-readme func (mgr *Manager) handleReady(rd *raft.Ready) error { + mgr.hooks.BeforeHandleReady() + // Handle volatile state updates for leadership tracking and observability if err := mgr.handleSoftState(rd); err != nil { return fmt.Errorf("handling soft state: %w", err) @@ -601,6 +609,8 @@ func (mgr *Manager) saveEntries(rd *raft.Ready) error { // processCommitEntries processes entries that have been committed by the Raft consensus // and updates the system state accordingly. func (mgr *Manager) processCommitEntries(rd *raft.Ready) error { + mgr.hooks.BeforeProcessCommittedEntries() + for i := range rd.CommittedEntries { var shouldNotify bool @@ -670,6 +680,7 @@ func (mgr *Manager) processConfChange(entry raftpb.Entry) error { // sendMessages delivers pending Raft messages to other members via the transport layer. func (mgr *Manager) sendMessages(rd *raft.Ready) error { + mgr.hooks.BeforeSendMessages() if len(rd.Messages) > 0 { // This code path will be properly implemented when network communication is added // See https://gitlab.com/gitlab-org/gitaly/-/issues/6304 diff --git a/internal/gitaly/storage/raftmgr/manager_test.go b/internal/gitaly/storage/raftmgr/manager_test.go index 9d8e0e29f8d..cb9658e5a00 100644 --- a/internal/gitaly/storage/raftmgr/manager_test.go +++ b/internal/gitaly/storage/raftmgr/manager_test.go @@ -7,12 +7,15 @@ import ( "path/filepath" "sort" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -138,6 +141,42 @@ func TestManager_Initialize(t *testing.T) { require.NoError(t, mgr.Close()) }) + + t.Run("fail waiting for raft group to be ready", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + storageName := cfg.Storages[0].Name + partitionID := storage.PartitionID(1) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg, logger) + posTracker := log.NewPositionTracker() + recorder := NewEntryRecorder() + + // Create a raft storage + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, posTracker, NewMetrics()) + require.NoError(t, err) + + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + releaseReady := make(chan struct{}) + mgr.hooks.BeforeHandleReady = func() { + <-releaseReady + } + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.ErrorIs(t, err, ErrReadyTimeout) + + close(releaseReady) + require.NoError(t, mgr.Close()) + }) } func TestManager_AppendLogEntry(t *testing.T) { @@ -415,6 +454,574 @@ func TestManager_AppendLogEntry(t *testing.T) { }) } +func TestManager_AppendLogEntry_CrashRecovery(t *testing.T) { + t.Parallel() + + // testEnv encapsulates the test environment for raft manager crash tests + type testEnv struct { + mgr *Manager + db keyvalue.Transactioner + dbMgr *databasemgr.DBManager + stagingDir string + stateDir string + cfg config.Cfg + recorder *EntryRecorder + baseLSN storage.LSN + storageName string + partitionID storage.PartitionID + } + + // Helper to create a test log entry + createTestLogEntry := func(t *testing.T, ctx context.Context, content string) string { + t.Helper() + + entryDir := testhelper.TempDir(t) + entry := wal.NewEntry(entryDir) + entry.SetKey([]byte("test-key"), []byte(content)) + + // Write the manifest + require.NoError(t, wal.WriteManifest(ctx, entryDir, &gitalypb.LogEntry{ + Operations: entry.Operations(), + })) + + return entryDir + } + + // Helper for setting up a test environment + setupTest := func(t *testing.T, ctx context.Context, partitionID storage.PartitionID, setupFuncs ...func(*Manager)) testEnv { + t.Helper() + + cfg := testcfg.Build(t) + raftCfg := raftConfigsForTest(t) + logger := testhelper.NewLogger(t) + + storageName := cfg.Storages[0].Name + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + recorder := NewEntryRecorder() + + dbMgr := openTestDB(t, ctx, cfg, logger) + t.Cleanup(dbMgr.Close) + + db, err := dbMgr.GetDB(cfg.Storages[0].Name) + require.NoError(t, err) + + raftStorage, err := NewStorage(raftCfg, logger, storageName, partitionID, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker(), NewMetrics()) + require.NoError(t, err) + + // Configure manager + mgr, err := NewManager(storageName, partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(recorder)) + require.NoError(t, err) + + for _, f := range setupFuncs { + f(mgr) + } + + // Initialize the manager + err = mgr.Initialize(ctx, 0) + require.NoError(t, err) + + // Create a log entry and append it to establish a baseline + logEntryPath := createTestLogEntry(t, ctx, "base-content") + lsn, err := mgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + require.Greater(t, lsn, storage.LSN(0)) + + return testEnv{ + mgr: mgr, + db: db, + dbMgr: dbMgr, + stagingDir: stagingDir, + stateDir: stateDir, + cfg: cfg, + recorder: recorder, + baseLSN: lsn, + storageName: storageName, + partitionID: partitionID, + } + } + + // Helper to create a recovery manager -- a new instance of the Raft Manager that picks resumes from where the + // crashed manager left off. + createRecoveryManager := func(t *testing.T, ctx context.Context, env testEnv, lastLSN storage.LSN) *Manager { + t.Helper() + + logger := testhelper.NewLogger(t) + raftCfg := raftConfigsForTest(t) + + // Get a new DB connection from the existing DB manager + dbMgr := env.dbMgr + db, err := dbMgr.GetDB(env.cfg.Storages[0].Name) + require.NoError(t, err) + + raftStorage, err := NewStorage(raftCfg, logger, env.storageName, env.partitionID, db, env.stagingDir, env.stateDir, &mockConsumer{}, log.NewPositionTracker(), NewMetrics()) + require.NoError(t, err) + + recoveryMgr, err := NewManager(env.storageName, env.partitionID, raftCfg, raftStorage, logger, WithEntryRecorder(env.recorder)) + require.NoError(t, err) + + // Initialize with the last known LSN + err = recoveryMgr.Initialize(ctx, lastLSN) + require.NoError(t, err) + + return recoveryMgr + } + + // Helper to verify recovery when change is NOT expected to be persisted or retained + verifyNonPersistingRecovery := func(t *testing.T, ctx context.Context, recoveryMgr *Manager, baseLSN storage.LSN, crashContent string) { + t.Helper() + + // First append a new entry - this is crucial because it may trigger overwriting + // of any uncommitted entries that might exist from before the crash + logEntryPath := createTestLogEntry(t, ctx, "recovery-content") + newLSN, err := recoveryMgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + + // Get the recorder which contains all recorded entries + recorder := recoveryMgr.EntryRecorder + require.NotNil(t, recorder, "EntryRecorder must be configured for verification") + + // Check all entries in the recorder after the base LSN + // We should NOT find our crash content in any user (non-Raft) entries + crashEntryFound := false + + // Examine all entries from baseLSN+1 to newLSN + for lsn := baseLSN + 1; lsn <= newLSN; lsn++ { + // Skip Raft internal entries - we only care about user entries + if recorder.IsFromRaft(lsn) { + continue + } + + // For user entries, check if the content matches our crash content + entry, err := wal.ReadManifest(recoveryMgr.GetEntryPath(lsn)) + if err != nil { + continue // Skip entries that can't be read + } + + // Check if this entry contains our crash content + for _, op := range entry.GetOperations() { + if op.GetSetKey() != nil && + string(op.GetSetKey().GetKey()) == "test-key" && + string(op.GetSetKey().GetValue()) == crashContent { + crashEntryFound = true + break + } + } + + if crashEntryFound { + break + } + } + + // Our crash content should NOT be persisted anywhere after recovery and new append + require.False(t, crashEntryFound, + "entry with crash content '%s' should not exist after recovery and a new append", crashContent) + + // Verify our recovery entry has the expected content + newEntry, err := wal.ReadManifest(recoveryMgr.GetEntryPath(newLSN)) + require.NoError(t, err) + + // Check for the specific recovery content + recoveryContentFound := false + for _, op := range newEntry.GetOperations() { + if op.GetSetKey() != nil && + string(op.GetSetKey().GetKey()) == "test-key" && + string(op.GetSetKey().GetValue()) == "recovery-content" { + recoveryContentFound = true + break + } + } + require.True(t, recoveryContentFound, "recovery content should be found in new entry") + } + + // Helper to verify recovery when change IS expected to be persisted and retained + verifyPersistingRecovery := func(t *testing.T, ctx context.Context, recoveryMgr *Manager, baseLSN storage.LSN, crashContent string) { + t.Helper() + + // First append a new entry - this will tell us if the crash entry is truly committed + // If the crash entry wasn't actually committed, it would be overwritten now + logEntryPath := createTestLogEntry(t, ctx, "recovery-content") + newLSN, err := recoveryMgr.AppendLogEntry(logEntryPath) + require.NoError(t, err) + + // Get the recorder which contains all recorded entries + recorder := recoveryMgr.EntryRecorder + require.NotNil(t, recorder, "EntryRecorder must be configured for verification") + + // Now check all user entries to see if our crash content survived the append of a new entry + crashEntryLSN := storage.LSN(0) + + // Examine all entries from baseLSN+1 to newLSN + for lsn := baseLSN + 1; lsn < newLSN; lsn++ { + // Skip Raft internal entries - we only care about user entries + if recorder.IsFromRaft(lsn) { + continue + } + + // For user entries, check if the content matches our crash content + entry, err := wal.ReadManifest(recoveryMgr.GetEntryPath(lsn)) + if err != nil { + continue // Skip entries that can't be read + } + + // Check if this entry contains our crash content + for _, op := range entry.GetOperations() { + if op.GetSetKey() != nil && + string(op.GetSetKey().GetKey()) == "test-key" && + string(op.GetSetKey().GetValue()) == crashContent { + crashEntryLSN = lsn + break + } + } + + if crashEntryLSN != 0 { + break + } + } + + // Our crash content should still be persisted somewhere even after a new append + require.NotEqual(t, storage.LSN(0), crashEntryLSN, + "committed entry with crash content '%s' should exist even after a new append", crashContent) + + // Verify our recovery entry is also there and has the expected content + newEntry, err := wal.ReadManifest(recoveryMgr.GetEntryPath(newLSN)) + require.NoError(t, err) + require.False(t, recorder.IsFromRaft(newLSN), "recovery entry should not be from Raft") + + // Check for the specific recovery content + recoveryContentFound := false + for _, op := range newEntry.GetOperations() { + if op.GetSetKey() != nil && + string(op.GetSetKey().GetKey()) == "test-key" && + string(op.GetSetKey().GetValue()) == "recovery-content" { + recoveryContentFound = true + break + } + } + require.True(t, recoveryContentFound, "recovery content should be found in new entry") + } + + // Register a cleanup function to close the DB manager at the end of each test + t.Cleanup(func() { + // The individual test cases will close their own managers and DB connections, + // but we need to ensure any shared DB managers are also closed at the end + }) + + t.Run("AppendLogEntry crash during propose", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + env := setupTest(t, ctx, storage.PartitionID(1)) + + // Set up hook to panic during propose + env.mgr.hooks.BeforePropose = func(logEntryPath string) { + panic("simulated crash during propose") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-propose" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + // Try to append - should panic + require.PanicsWithValue(t, "simulated crash during propose", func() { + _, _ = env.mgr.AppendLogEntry(logEntryPath) + }) + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change should NOT be persisted + verifyNonPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during commit entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(2)) + + // Set up hook to panic during commit entries + env.mgr.hooks.BeforeProcessCommittedEntries = func() { + panic("simulated crash during commit entries") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-commit" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + // Try to append - should fail with ErrManagerStopped + _, err := env.mgr.AppendLogEntry(logEntryPath) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during commit entries") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change SHOULD be persisted + // Even though client received an error, the entry was persisted before the crash + verifyPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during node advance", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + // The trigger prevents node advance from being triggered during first election. The hook must be + // inserted before Initialize(). Otherwise, we'll have a race. + var trigger atomic.Bool + env := setupTest(t, ctx, storage.PartitionID(3), func(mgr *Manager) { + mgr.hooks.BeforeNodeAdvance = func() { + if trigger.Load() { + panic("simulated crash during node advance") + } + } + }) + + // Create a test entry that will trigger the panic + crashContent := "crash-during-node-advance" + trigger.Store(true) + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + // Try to append - should return nil error since entry is committed before crash + lsn, err := env.mgr.AppendLogEntry(logEntryPath) + // At this point, the log entry is committed. Callers should receive the result + require.NoError(t, err, "client should receive success before the crash") + require.Greater(t, lsn, env.baseLSN, "should return valid LSN before crash") + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during node advance") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change SHOULD be persisted and client already got success + verifyPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during handle ready", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(4)) + + // Set up hook to panic during handle ready + env.mgr.hooks.BeforeHandleReady = func() { + panic("simulated crash during handle ready") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-handle-ready" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + _, err := env.mgr.AppendLogEntry(logEntryPath) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during handle ready") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change should NOT be persisted + // In a single-node setup, this behaves like a propose crash (change not persisted) + // In a multi-node setup, this could behave differently as the entry might already + // be replicated to other nodes before the crash + verifyNonPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during insert log entry", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(5)) + + // Set up hook to panic during insert log entry + env.mgr.storage.hooks.BeforeInsertLogEntry = func(index uint64) { + panic("simulated crash during insert log entry") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-insert" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + _, err := env.mgr.AppendLogEntry(logEntryPath) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during insert log entry") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // Verify recovery - change should NOT be persisted + // In a single-node setup, this behaves like a propose crash (change not persisted) + // In a multi-node setup, this could behave differently as the entry might already + // be replicated to other nodes before the crash + verifyNonPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry crash during save hard state", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(6)) + + // Set up hook to panic during save hard state + env.mgr.storage.hooks.BeforeSaveHardState = func() { + panic("simulated crash during save hard state") + } + + // Create a test entry that will trigger the panic + crashContent := "crash-during-save-hard-state" + logEntryPath := createTestLogEntry(t, ctx, crashContent) + + // In a single-node setup, this behaves like a propose crash (change not persisted) + // In a multi-node setup, this could behave differently as the entry might already + // be replicated to other nodes before the crash + _, err := env.mgr.AppendLogEntry(logEntryPath) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-env.mgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during save hard state") + + // Create recovery manager + require.NoError(t, env.mgr.Close()) + recoveryMgr := createRecoveryManager(t, ctx, env, env.baseLSN) + defer testhelper.MustClose(t, recoveryMgr) + + // When a crash occurs during saveHardState, the log entry is still persisted, unlike crashes + // during propose, handle ready, or insert log entry. This is because the entry is already + // physically written to disk in Storage.insertLogEntry (including fsync calls) before + // saveHardState is invoked. This behavior follows the guideline: + // https://pkg.go.dev/go.etcd.io/etcd/raft/v3#section-readme. + // The hard state update merely records metadata about what's committed, but doesn't affect the entry's + // persistence. During recovery, Raft will find the entry on disk even though the hard state wasn't + // updated to reflect it. In a single-node setup, this entry will be considered valid and retained + // because there's no conflicting entry with a higher term from another node. In multi-node setups, this + // behavior might differ as entries could be overwritten by entries with higher terms from the new + // leader. + verifyPersistingRecovery(t, ctx, recoveryMgr, env.baseLSN, crashContent) + }) + + t.Run("AppendLogEntry multiple crash and recovery cycle", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + env := setupTest(t, ctx, storage.PartitionID(7)) + + // Keep track of LSNs for each successful append + lastKnownLSN := env.baseLSN + + // First recovery cycle - crash during propose + require.NoError(t, env.mgr.Close()) + firstRecoveryMgr := createRecoveryManager(t, ctx, env, lastKnownLSN) + + // Add one successful entry to advance the LSN + logEntryPath1 := createTestLogEntry(t, ctx, "first-recovery-success") + lsn1, err := firstRecoveryMgr.AppendLogEntry(logEntryPath1) + require.NoError(t, err) + require.Greater(t, lsn1, lastKnownLSN) + lastKnownLSN = lsn1 + + // Set up hook to crash during propose + firstRecoveryMgr.hooks.BeforePropose = func(logEntryPath string) { + panic("simulated crash during first recovery") + } + + // Attempt that will crash + logEntryPath2 := createTestLogEntry(t, ctx, "first-recovery-crash") + require.PanicsWithValue(t, "simulated crash during first recovery", func() { + _, _ = firstRecoveryMgr.AppendLogEntry(logEntryPath2) + }) + + // Close the manager only, keep the DB manager + require.NoError(t, firstRecoveryMgr.Close()) + + // Second recovery cycle - crash during commit + secondRecoveryMgr := createRecoveryManager(t, ctx, env, lastKnownLSN) + + // Add one successful entry to advance the LSN + logEntryPath3 := createTestLogEntry(t, ctx, "second-recovery-success") + lsn2, err := secondRecoveryMgr.AppendLogEntry(logEntryPath3) + require.NoError(t, err) + require.Greater(t, lsn2, lastKnownLSN) + lastKnownLSN = lsn2 + + // Set up hook to crash during commit + secondRecoveryMgr.hooks.BeforeProcessCommittedEntries = func() { + panic("simulated crash during second recovery") + } + + // Attempt that will crash + logEntryPath4 := createTestLogEntry(t, ctx, "second-recovery-crash") + _, err = secondRecoveryMgr.AppendLogEntry(logEntryPath4) + require.ErrorIs(t, err, ErrManagerStopped) + + var finalErr error + require.Eventually(t, func() bool { + finalErr = <-secondRecoveryMgr.GetNotificationQueue() + return finalErr != nil + }, 5*time.Second, 10*time.Millisecond) + require.ErrorContains(t, finalErr, "simulated crash during second recovery") + + // Close the manager only, keep the DB manager + require.NoError(t, secondRecoveryMgr.Close()) + + // Final recovery - verify system state after multiple crashes + finalRecoveryMgr := createRecoveryManager(t, ctx, env, lastKnownLSN) + + // For commit crash, the crashed entry should be persisted + require.Greater(t, finalRecoveryMgr.AppendedLSN(), lastKnownLSN, + "commit entry crash should persist the change") + + // Verify crashed entry content + entry, err := wal.ReadManifest(finalRecoveryMgr.GetEntryPath(lastKnownLSN + 1)) + require.NoError(t, err) + require.Contains(t, entry.String(), "second-recovery-crash", + "entry should contain content from crash during commit") + + // Should be able to append new entries after recovery + logEntryPath5 := createTestLogEntry(t, ctx, "final-recovery-success") + finalLSN, err := finalRecoveryMgr.AppendLogEntry(logEntryPath5) + require.NoError(t, err) + require.Greater(t, finalLSN, lastKnownLSN+1) + + require.NoError(t, finalRecoveryMgr.Close()) + }) +} + func TestManager_Close(t *testing.T) { t.Parallel() diff --git a/internal/gitaly/storage/raftmgr/storage.go b/internal/gitaly/storage/raftmgr/storage.go index 97a4644b5a3..73dd55b38a8 100644 --- a/internal/gitaly/storage/raftmgr/storage.go +++ b/internal/gitaly/storage/raftmgr/storage.go @@ -115,6 +115,9 @@ type Storage struct { consumer storage.LogConsumer stagingDir string snapshotter Snapshotter + + // hooks is a collection of hooks, used in test environment to intercept critical events + hooks testHooks } // raftManifestPath returns the path to the manifest file within a log entry directory. The manifest file contains @@ -175,6 +178,7 @@ func NewStorage( consumer: consumer, stagingDir: stagingDirectory, snapshotter: snapshotter, + hooks: noopHooks(), }, nil } @@ -375,6 +379,8 @@ func (s *Storage) saveHardState(hardState raftpb.HardState) error { committedLSN := storage.LSN(hardState.Commit) if err := func() error { + s.hooks.BeforeSaveHardState() + s.mutex.Lock() defer s.mutex.Unlock() @@ -497,6 +503,8 @@ func (s *Storage) draftLogEntry(raftEntry raftpb.Entry, callback func(*wal.Entry // insertLogEntry inserts a log entry to WAL at a certain position with respective Raft metadata. func (s *Storage) insertLogEntry(raftEntry raftpb.Entry, logEntryPath string) error { + s.hooks.BeforeInsertLogEntry(raftEntry.Index) + s.mutex.Lock() defer s.mutex.Unlock() -- GitLab From c12ee1385c8f1a3e2b3c79c52b7d6bdec12dd053 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 28 Feb 2025 16:01:53 +0700 Subject: [PATCH 05/10] raft: Generalize snapshotter metrics In raftmgr package, there are some prometheus metrics encapsulated in a metrics object. Those metrics are used for snapshotter at the moment. In the future, Raft package needs more metrics to track the activity of Raft. Therefore, this commit generalizes the snapshotter-specific object so that it can be used to store broader types of metrics. --- .../raftmgr/{snapshot_metrics.go => metrics.go} | 12 ++++++------ internal/gitaly/storage/raftmgr/snapshotter.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) rename internal/gitaly/storage/raftmgr/{snapshot_metrics.go => metrics.go} (77%) diff --git a/internal/gitaly/storage/raftmgr/snapshot_metrics.go b/internal/gitaly/storage/raftmgr/metrics.go similarity index 77% rename from internal/gitaly/storage/raftmgr/snapshot_metrics.go rename to internal/gitaly/storage/raftmgr/metrics.go index ac48c8e8395..6dd2b8d3d6f 100644 --- a/internal/gitaly/storage/raftmgr/snapshot_metrics.go +++ b/internal/gitaly/storage/raftmgr/metrics.go @@ -2,7 +2,7 @@ package raftmgr import "github.com/prometheus/client_golang/prometheus" -// Metrics contains the unscoped collected by Snapshotter. +// Metrics contains the unscoped collected by Raft activities. type Metrics struct { snapSaveSec *prometheus.HistogramVec } @@ -32,15 +32,15 @@ func (m *Metrics) Collect(metrics chan<- prometheus.Metric) { m.snapSaveSec.Collect(metrics) } -// SnapshotterMetrics are metrics scoped for a specific storage -type SnapshotterMetrics struct { +// RaftMetrics are metrics scoped for a specific storage +type RaftMetrics struct { snapSaveSec prometheus.Observer } -// Scope returns snapshotter metrics scoped for a specific storage. -func (m *Metrics) Scope(storage string) SnapshotterMetrics { +// Scope returns Raft metrics scoped for a specific storage. +func (m *Metrics) Scope(storage string) RaftMetrics { labels := prometheus.Labels{"storage": storage} - return SnapshotterMetrics{ + return RaftMetrics{ snapSaveSec: m.snapSaveSec.With(labels), } } diff --git a/internal/gitaly/storage/raftmgr/snapshotter.go b/internal/gitaly/storage/raftmgr/snapshotter.go index e58726abe92..4ecb7830623 100644 --- a/internal/gitaly/storage/raftmgr/snapshotter.go +++ b/internal/gitaly/storage/raftmgr/snapshotter.go @@ -40,7 +40,7 @@ type RaftSnapshotter struct { sync.Mutex logger logging.Logger dir string - metrics SnapshotterMetrics + metrics RaftMetrics } // Snapshotter is an interface to implement snapshotting in raft @@ -50,7 +50,7 @@ type Snapshotter interface { } // NewRaftSnapshotter creates a new Snapshotter -func NewRaftSnapshotter(cfg config.Raft, logger logging.Logger, metrics SnapshotterMetrics) (Snapshotter, error) { +func NewRaftSnapshotter(cfg config.Raft, logger logging.Logger, metrics RaftMetrics) (Snapshotter, error) { logger = logger.WithField("component", "raft.snapshotter") logger.Info("Initializing Raft Snapshotter") -- GitLab From 4a9af953b17127a6b8d135713d02c059dded0fdb Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 3 Mar 2025 09:58:11 +0700 Subject: [PATCH 06/10] raft: Simplify snapshotter tests Raft's snapshotter is reponsible for generating Raft-friendly snapshots. A snapshot is sent over the network when Raft detects a missing log entry in replicas. Recently in the test suite, the snapshotter materializes a transaction's snapshot from a real partition. In upcoming commits, `storagemgr/partition` package needs to import `raftmgr` package. That leads to cycle import problem. This commit simplifies the snapshotter test suite. Apparently, the test suite doesn't need to create a full-feature partition. It requires the file-system and key-value DB utilities of the partition only. --- .../storage/raftmgr/snapshotter_test.go | 151 +++++++++++------- 1 file changed, 91 insertions(+), 60 deletions(-) diff --git a/internal/gitaly/storage/raftmgr/snapshotter_test.go b/internal/gitaly/storage/raftmgr/snapshotter_test.go index 8d5ba51d8d1..15f745cf989 100644 --- a/internal/gitaly/storage/raftmgr/snapshotter_test.go +++ b/internal/gitaly/storage/raftmgr/snapshotter_test.go @@ -2,6 +2,8 @@ package raftmgr import ( "bufio" + "bytes" + "context" "os" "path/filepath" "sync" @@ -9,28 +11,22 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/archive" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" - "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/fsrecorder" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/snapshot" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/protobuf/encoding/protodelim" ) func TestRaftSnapshotter_materializeSnapshot(t *testing.T) { t.Parallel() - if !testhelper.IsWALEnabled() { - t.Skip(`Transactions must be enabled for raft snapshots to work.`) - } - - // Setup partition with transaction enabled ctx := testhelper.Context(t) cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{ Raft: config.Raft{ @@ -38,53 +34,44 @@ func TestRaftSnapshotter_materializeSnapshot(t *testing.T) { }, })) logger := testhelper.NewLogger(t) - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - // Setup factories - cmdFactory := gittest.NewCommandFactory(t, cfg) - locator := config.NewLocator(cfg) - localRepoFactory := localrepo.NewFactory(logger, locator, cmdFactory, catfileCache) - partitionFactory := partition.NewFactory(cmdFactory, localRepoFactory, partition.NewMetrics(nil), nil) - storageName := cfg.Storages[0].Name storagePath := cfg.Storages[0].Path - // Setup db mgr and storage mgr - dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) - require.NoError(t, err) - defer dbMgr.Close() - storageMgr, err := storagemgr.NewStorageManager( - logger, - cfg.Storages[0].Name, - cfg.Storages[0].Path, - dbMgr, - partitionFactory, - 1, - storagemgr.NewMetrics(cfg.Prometheus), - ) - require.NoError(t, err) - defer storageMgr.Close() - - // Retrieve partition from storage - p, err := storageMgr.GetPartition(ctx, storage.PartitionID(1)) - require.NoError(t, err) - // Create repo so partition is not empty repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ SkipCreationViaService: true, Storage: config.Storage{Name: storageName, Path: storagePath}, }) - metrics := NewMetrics().Scope("default") - // Setup snapshotter - s, err := NewRaftSnapshotter(cfg.Raft, logger, metrics) + // Add some KV entries to the DB so that the collected DB is not empty. + db := getTestDBManager(t, ctx, cfg, logger) + dbTxn := db.NewTransaction(true) + require.NoError(t, dbTxn.Set([]byte("hello"), []byte("world"))) + require.NoError(t, dbTxn.Commit()) + + // Setup a proper snapshot with partition snapshot manager + snapshotManager, err := snapshot.NewManager( + logger, + storagePath, + testhelper.TempDir(t), + snapshot.NewMetrics().Scope(storageName), + ) require.NoError(t, err) + defer testhelper.MustClose(t, snapshotManager) - // Begin transaction on partition - txn, err := p.Begin(ctx, storage.BeginOptions{ - RelativePaths: []string{repo.GetRelativePath()}, - }) + // Create a new snapshot of the partition + snapshot, err := snapshotManager.GetSnapshot(ctx, []string{repo.GetRelativePath()}, false) + require.NoError(t, err) + defer testhelper.MustClose(t, snapshot) + + // Create a mock transaction which depends on the newly created snapshot. + txn := &mockTransaction{ + db: db, + fs: fsrecorder.NewFS(snapshot.Root(), wal.NewEntry(testhelper.TempDir(t))), + } + + // Setup snapshotter + s, err := NewRaftSnapshotter(cfg.Raft, logger, NewMetrics().Scope("default")) require.NoError(t, err) // Package partition's disk into snapshot @@ -101,29 +88,57 @@ func TestRaftSnapshotter_materializeSnapshot(t *testing.T) { require.NoError(t, err) defer tar.Close() + // This commit is a no-op require.NoError(t, txn.Commit(ctx)) // Tar should contain a directory with repository - testhelper.ContainsTarState(t, bufio.NewReader(tar), testhelper.DirectoryState{ + directoryState := testhelper.DirectoryState{ filepath.Join("fs", repo.GetRelativePath()): {Mode: archive.DirectoryMode}, - filepath.Join("fs", repo.GetRelativePath(), "HEAD"): {Mode: archive.TarFileMode, Content: []byte("ref: refs/heads/main\n")}, filepath.Join("fs", repo.GetRelativePath(), "objects"): {Mode: archive.DirectoryMode}, filepath.Join("fs", repo.GetRelativePath(), "objects", "info"): {Mode: archive.DirectoryMode}, filepath.Join("fs", repo.GetRelativePath(), "objects", "pack"): {Mode: archive.DirectoryMode}, - filepath.Join("fs", repo.GetRelativePath(), "refs"): {Mode: archive.DirectoryMode}, - filepath.Join("fs", repo.GetRelativePath(), "refs", "heads"): {Mode: archive.DirectoryMode}, - filepath.Join("fs", repo.GetRelativePath(), "refs", "tags"): {Mode: archive.DirectoryMode}, - "kv-state": {Mode: archive.TarFileMode, Content: []byte{}}, - }) + filepath.Join("fs", repo.GetRelativePath(), "config"): { + Mode: archive.TarFileMode, + Content: "config content", + ParseContent: func(tb testing.TB, path string, content []byte) any { + require.Equal(t, filepath.Join("fs", repo.GetRelativePath(), "config"), path) + return "config content" + }, + }, + filepath.Join("fs", repo.GetRelativePath(), "refs"): {Mode: archive.DirectoryMode}, + "kv-state": {Mode: archive.TarFileMode, ParseContent: func(tb testing.TB, path string, content []byte) any { + var keyPair gitalypb.KVPair + require.NoError(t, protodelim.UnmarshalFrom(bytes.NewReader(content), &keyPair)) + + testhelper.ProtoEqual(t, &gitalypb.KVPair{ + Key: []byte("hello"), + Value: []byte("world"), + }, &keyPair) + return nil + }}, + } + if testhelper.IsReftableEnabled() { + directoryState[filepath.Join("fs", repo.GetRelativePath(), "refs", "heads")] = testhelper.DirectoryEntry{ + Mode: archive.TarFileMode, + Content: []byte("this repository uses the reftable format\n"), + } + } else { + directoryState[filepath.Join("fs", repo.GetRelativePath(), "HEAD")] = testhelper.DirectoryEntry{ + Mode: archive.TarFileMode, Content: []byte("ref: refs/heads/main\n"), + } + directoryState[filepath.Join("fs", repo.GetRelativePath(), "refs", "heads")] = testhelper.DirectoryEntry{ + Mode: archive.DirectoryMode, + } + directoryState[filepath.Join("fs", repo.GetRelativePath(), "refs", "tags")] = testhelper.DirectoryEntry{ + Mode: archive.DirectoryMode, + } + } + testhelper.ContainsTarState(t, bufio.NewReader(tar), directoryState) } func TestRaftStorage_TriggerSnapshot(t *testing.T) { t.Parallel() - if !testhelper.IsWALEnabled() { - t.Skip(`Transactions must be enabled for raft snapshots to work.`) - } - ctx := testhelper.Context(t) t.Run("reject snapshot creation if no transaction found in context", func(t *testing.T) { // Create a mock Storage and mock RaftSnapshotter @@ -227,9 +242,25 @@ func (m *MockRaftSnapshotter) materializeSnapshot(snapshotMetadata SnapshotMetad }, nil } -type mockTransaction struct{ storage.Transaction } +type mockTransaction struct { + storage.Transaction + db keyvalue.Transactioner + fs fsrecorder.FS +} -// Mock SnapshotLSN func (*mockTransaction) SnapshotLSN() storage.LSN { return storage.LSN(1) } + +func (m *mockTransaction) KV() keyvalue.ReadWriter { + return m.db.NewTransaction(true) +} + +func (m *mockTransaction) FS() storage.FS { + return m.fs +} + +func (m *mockTransaction) Commit(context.Context) error { + // No-Op + return nil +} -- GitLab From b437e2bcbf11065758cb2d5a8955cd8c45ee5b6a Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 9 Jan 2025 09:48:09 +0700 Subject: [PATCH 07/10] raft: Integrate Raft Manager into Transaction Manager Integrating the Raft consensus algorithm required creating a Raft Manager for Gitaly. This commit embeds the Raft Manager factory into the Transaction Manager factory, allowing Gitaly to create Raft Manager instances conditionally based on configuration settings. Test modifications were made to include this new parameter. The parameter is set to nil to ensure seamless integration without impacting existing test cases. Later commits will add extra factory setups case by case. --- internal/cli/gitaly/serve.go | 17 ++++- internal/cli/gitaly/subcmd_recovery.go | 2 + internal/cli/gitaly/subcmd_recovery_test.go | 6 ++ .../housekeeping/manager/testhelper_test.go | 2 + internal/git/objectpool/fetch_test.go | 2 + internal/gitaly/storage/raftmgr/manager.go | 22 +++++- .../storage/storagemgr/partition/factory.go | 75 +++++++++++++++---- .../storage/storagemgr/partition/metrics.go | 4 + .../partition/migration/manager_test.go | 2 +- .../migration/xxxx_reftable_migration_test.go | 2 +- .../storagemgr/partition/testhelper_test.go | 2 +- .../partition/transaction_manager_test.go | 2 +- internal/testhelper/testserver/gitaly.go | 2 + 13 files changed, 117 insertions(+), 23 deletions(-) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 7dee9d04109..512eb2c3f04 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -371,9 +371,10 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { storageMetrics := storagemgr.NewMetrics(cfg.Prometheus) housekeepingMetrics := housekeeping.NewMetrics(cfg.Prometheus) + raftMetrics := raftmgr.NewMetrics() partitionMetrics := partition.NewMetrics(housekeepingMetrics) migrationMetrics := migration.NewMetrics() - prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics) + prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics, raftMetrics) migrations := []migration.Migration{} @@ -409,6 +410,11 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { logConsumer = walArchiver } + var raftFactory raftmgr.RaftManagerFactory + if cfg.Raft.Enabled { + raftFactory = raftmgr.DefaultFactory(cfg.Raft) + } + nodeMgr, err := nodeimpl.NewManager( cfg.Storages, storagemgr.NewFactory( @@ -420,6 +426,8 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { localrepoFactory, partitionMetrics, logConsumer, + cfg.Raft, + raftFactory, ), migrationMetrics, migrations, @@ -469,6 +477,8 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { localrepoFactory, partitionMetrics, nil, + cfg.Raft, + nil, ), // In recovery mode we don't want to keep inactive partitions active. The cache // however can't be disabled so simply set it to one. @@ -543,13 +553,12 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("resolve backup locator: %w", err) } } - raftTransport := &raftmgr.GrpcTransport{} + + var raftTransport *raftmgr.GrpcTransport if cfg.Raft.Enabled { raftManagerRegistry := raftmgr.NewRaftManagerRegistry() routingTable := raftmgr.NewStaticRaftRoutingTable() raftTransport = raftmgr.NewGrpcTransport(logger, cfg, routingTable, raftManagerRegistry, conns) - raftSnapshotterMetrics := raftmgr.NewMetrics() - prometheus.MustRegister(raftSnapshotterMetrics) } var bundleURIManager *bundleuri.GenerationManager diff --git a/internal/cli/gitaly/subcmd_recovery.go b/internal/cli/gitaly/subcmd_recovery.go index 275543cacd4..887342c2fb9 100644 --- a/internal/cli/gitaly/subcmd_recovery.go +++ b/internal/cli/gitaly/subcmd_recovery.go @@ -562,6 +562,8 @@ func setupRecoveryContext(ctx *cli.Context) (rc recoveryContext, returnErr error localrepo.NewFactory(logger, config.NewLocator(cfg), gitCmdFactory, catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), migration.NewMetrics(), []migration.Migration{}, diff --git a/internal/cli/gitaly/subcmd_recovery_test.go b/internal/cli/gitaly/subcmd_recovery_test.go index 24f89366c90..b43467acf93 100644 --- a/internal/cli/gitaly/subcmd_recovery_test.go +++ b/internal/cli/gitaly/subcmd_recovery_test.go @@ -342,6 +342,8 @@ Available WAL backup entries: up to LSN: %s`, localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), 1, storagemgr.NewMetrics(cfg.Prometheus), @@ -650,6 +652,8 @@ Successfully processed log entries up to LSN: %s`, localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), 1, storagemgr.NewMetrics(cfg.Prometheus), @@ -697,6 +701,8 @@ Successfully processed log entries up to LSN: %s`, localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), 1, storagemgr.NewMetrics(cfg.Prometheus), diff --git a/internal/git/housekeeping/manager/testhelper_test.go b/internal/git/housekeeping/manager/testhelper_test.go index f1b463bfac4..5db1581b9d4 100644 --- a/internal/git/housekeeping/manager/testhelper_test.go +++ b/internal/git/housekeeping/manager/testhelper_test.go @@ -109,6 +109,8 @@ func testWithAndWithoutTransaction(t *testing.T, ctx context.Context, desc strin localRepoFactory, partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), storagemgr.DefaultMaxInactivePartitions, storagemgr.NewMetrics(cfg.Prometheus), diff --git a/internal/git/objectpool/fetch_test.go b/internal/git/objectpool/fetch_test.go index e50efd05e5a..6ce2c8d6991 100644 --- a/internal/git/objectpool/fetch_test.go +++ b/internal/git/objectpool/fetch_test.go @@ -434,6 +434,8 @@ func testWithAndWithoutTransaction(t *testing.T, ctx context.Context, testFunc f localRepoFactory, partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), storagemgr.DefaultMaxInactivePartitions, storagemgr.NewMetrics(cfg.Prometheus), diff --git a/internal/gitaly/storage/raftmgr/manager.go b/internal/gitaly/storage/raftmgr/manager.go index d793e560178..a3fd5becba5 100644 --- a/internal/gitaly/storage/raftmgr/manager.go +++ b/internal/gitaly/storage/raftmgr/manager.go @@ -158,7 +158,27 @@ func applyOptions(raftCfg config.Raft, opts []OptionFunc) (ManagerOptions, error return options, nil } -// NewManager creates a new Raft Manager instance. +// RaftManagerFactory defines a function type that creates a new Raft Manager instance. +type RaftManagerFactory func( + storageName string, + partitionID storage.PartitionID, + raftStorage *Storage, + logger logging.Logger, +) (*Manager, error) + +// DefaultFactory returns a RaftManagerFactory that returns a manager from input raft config +func DefaultFactory(raftCfg config.Raft) RaftManagerFactory { + return func( + storageName string, + partitionID storage.PartitionID, + raftStorage *Storage, + logger logging.Logger, + ) (*Manager, error) { + return NewManager(storageName, partitionID, raftCfg, raftStorage, logger) + } +} + +// NewManager creates an instance of Manager. func NewManager( authorityName string, partitionID storage.PartitionID, diff --git a/internal/gitaly/storage/storagemgr/partition/factory.go b/internal/gitaly/storage/storagemgr/partition/factory.go index 82a92808977..90a06c15d43 100644 --- a/internal/gitaly/storage/storagemgr/partition/factory.go +++ b/internal/gitaly/storage/storagemgr/partition/factory.go @@ -6,8 +6,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" logger "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -15,10 +17,12 @@ import ( // Factory is factory type that can create new partitions. type Factory struct { - cmdFactory gitcmd.CommandFactory - repoFactory localrepo.Factory - metrics Metrics - logConsumer storage.LogConsumer + cmdFactory gitcmd.CommandFactory + repoFactory localrepo.Factory + partitionMetrics Metrics + logConsumer storage.LogConsumer + raftCfg config.Raft + raftFactory raftmgr.RaftManagerFactory } // New returns a new Partition instance. @@ -26,8 +30,7 @@ func (f Factory) New( logger logger.Logger, partitionID storage.PartitionID, db keyvalue.Transactioner, - storageName string, - storagePath string, + storageName string, storagePath string, absoluteStateDir string, stagingDir string, ) storagemgr.Partition { @@ -50,7 +53,42 @@ func (f Factory) New( } } - logManager := log.NewManager(storageName, partitionID, stagingDir, absoluteStateDir, f.logConsumer, positionTracker) + var logManager storage.LogManager + if f.raftCfg.Enabled { + factory := f.raftFactory + if factory == nil { + factory = raftmgr.DefaultFactory(f.raftCfg) + } + + raftStorage, err := raftmgr.NewStorage( + f.raftCfg, + logger, + storageName, + partitionID, + db, + stagingDir, + absoluteStateDir, + f.logConsumer, + positionTracker, + f.partitionMetrics.raft, + ) + if err != nil { + panic(fmt.Errorf("creating raft storage: %w", err)) + } + raftManager, err := factory( + storageName, + partitionID, + raftStorage, + logger, + ) + if err != nil { + panic(fmt.Errorf("creating raft manager: %w", err)) + } + logManager = raftManager + } else { + logManager = log.NewManager(storageName, partitionID, stagingDir, absoluteStateDir, f.logConsumer, positionTracker) + } + return NewTransactionManager( partitionID, logger, @@ -61,22 +99,31 @@ func (f Factory) New( stagingDir, f.cmdFactory, repoFactory, - f.metrics.Scope(storageName), + f.partitionMetrics.Scope(storageName), logManager, ) } -// NewFactory returns a new Factory. +// NewFactory creates a partition factory with the given components: +// - cmdFactory: Used to create Git commands +// - repoFactory: Used to create local repository instances +// - metrics: Used to track partition operations +// - logConsumer: Consumes WAL entries (optional, can be nil) +// - raftFactory: Creates Raft managers for replicated partitions (optional, can be nil) func NewFactory( cmdFactory gitcmd.CommandFactory, repoFactory localrepo.Factory, - metrics Metrics, + partitionMetrics Metrics, logConsumer storage.LogConsumer, + raftCfg config.Raft, + raftFactory raftmgr.RaftManagerFactory, ) Factory { return Factory{ - cmdFactory: cmdFactory, - repoFactory: repoFactory, - metrics: metrics, - logConsumer: logConsumer, + cmdFactory: cmdFactory, + repoFactory: repoFactory, + partitionMetrics: partitionMetrics, + logConsumer: logConsumer, + raftCfg: raftCfg, + raftFactory: raftFactory, } } diff --git a/internal/gitaly/storage/storagemgr/partition/metrics.go b/internal/gitaly/storage/storagemgr/partition/metrics.go index 738221b1a4a..a97033a559a 100644 --- a/internal/gitaly/storage/storagemgr/partition/metrics.go +++ b/internal/gitaly/storage/storagemgr/partition/metrics.go @@ -3,6 +3,7 @@ package partition import ( "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/snapshot" ) @@ -12,6 +13,7 @@ type Metrics struct { // them is the responsibility of the caller. housekeeping *housekeeping.Metrics snapshot snapshot.Metrics + raft *raftmgr.Metrics commitQueueDepth *prometheus.GaugeVec commitQueueWaitSeconds *prometheus.HistogramVec @@ -30,6 +32,7 @@ func NewMetrics(housekeeping *housekeeping.Metrics) Metrics { return Metrics{ housekeeping: housekeeping, snapshot: snapshot.NewMetrics(), + raft: raftmgr.NewMetrics(), commitQueueDepth: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "gitaly_transaction_commit_queue_depth", Help: "Records the number transactions waiting in the commit queue.", @@ -65,6 +68,7 @@ func (m Metrics) Describe(out chan<- *prometheus.Desc) { // Collect implements prometheus.Collector. func (m Metrics) Collect(out chan<- prometheus.Metric) { m.snapshot.Collect(out) + m.raft.Collect(out) m.commitQueueDepth.Collect(out) m.commitQueueWaitSeconds.Collect(out) m.transactionControlStatementDurationSeconds.Collect(out) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go index 92429d21f9a..4075dd3c31f 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go @@ -175,7 +175,7 @@ func TestMigrationManager_Begin(t *testing.T) { m := partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)) - factory := partition.NewFactory(cmdFactory, repositoryFactory, m, nil) + factory := partition.NewFactory(cmdFactory, repositoryFactory, m, nil, cfg.Raft, nil) tm := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir) ctx, cancel := context.WithCancel(ctx) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_reftable_migration_test.go b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_reftable_migration_test.go index ca15cbc6475..78965e9d700 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/xxxx_reftable_migration_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/xxxx_reftable_migration_test.go @@ -133,7 +133,7 @@ func testReftableMigration(t *testing.T, ctx context.Context) { cmdFactory := gittest.NewCommandFactory(t, cfg) localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - partitionFactory := partition.NewFactory(cmdFactory, localRepoFactory, partition.NewMetrics(nil), nil) + partitionFactory := partition.NewFactory(cmdFactory, localRepoFactory, partition.NewMetrics(nil), nil, cfg.Raft, nil) database, err := keyvalue.NewBadgerStore(testhelper.SharedLogger(t), t.TempDir()) require.NoError(t, err) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 24671dad1fc..1825e17224b 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -1051,7 +1051,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas // managerRunning tracks whether the manager is running or closed. managerRunning bool // factory is the factory that produces the current TransactionManager - factory = NewFactory(setup.CommandFactory, setup.RepositoryFactory, newMetrics(), setup.Consumer) + factory = NewFactory(setup.CommandFactory, setup.RepositoryFactory, newMetrics(), setup.Consumer, setup.Config.Raft, nil) // transactionManager is the current TransactionManager instance. transactionManager = factory.New(logger, setup.PartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) // managerErr is used for synchronizing manager closing and returning diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index 7f2c97bb10e..00b8d6ad713 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -2413,7 +2413,7 @@ func BenchmarkTransactionManager(b *testing.B) { // Valid partition IDs are >=1. testPartitionID := storage.PartitionID(i + 1) - factory := NewFactory(cmdFactory, repositoryFactory, m, nil) + factory := NewFactory(cmdFactory, repositoryFactory, m, nil, cfg.Raft, nil) // transactionManager is the current TransactionManager instance. manager := factory.New(logger, testPartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 7c9a8301e36..ad9a6ca3a7c 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -369,6 +369,8 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, + cfg.Raft, + nil, ), migration.NewMetrics(), migrations, -- GitLab From 19708e258a7e09813fdbf46e6a50a1b3b11fd158 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 9 Jan 2025 14:28:11 +0700 Subject: [PATCH 08/10] raft: Enable Raft manager in TransactionManager tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit enabled TransactionManager to interact with Raft Manager, but the test suite remained unchanged. This commit introduces the GITALY_TEST_RAFT environment variable. When set, it embeds a test Raft Manager into the partition factory, ensuring TransactionManager tests flow through the Raft lifecycle. This introduces two major changes when Raft is used: * Initial Election Overhead: Upon starting, Raft Manager performs an initial election, even when the current node is the sole leader. This process takes a few seconds but is amortized over the Gitaly server lifecycle. In production, this occurs only once and has minimal impact. However, for tests that frequently create partitions from scratch, this overhead extends the test setup time. * Non-Deterministic Log Entries: Raft's state machine inserts internal log entries (e.g., configuration changes and empty verification entries) that occupy LSN sequences in a non-deterministic order. These entries don't affect application correctness but complicate assertions on log state during tests. Examples: Without Raft: Entry 1 (update A) -> Entry 2 (update B) With Raft (scenario 1): Entry 1 (ConfChange) -> Entry 2 (Empty verification) -> Entry 3 (update A) -> Entry 4 (update B) With Raft (scenario 2): Entry 1 (ConfChange) -> Entry 2 (update A) -> Entry 3 (Empty verification) -> Entry 4 (update B) To address this, the Raft Manager records all inserted log entries with metadata during the test setup. Test helpers adjust expectations accordingly to ensure reliable and accurate assertions. Some tests are skipped because they manually insert log entries or lack a reliable mechanism to assert results in the presence of Raft’s internal log entries. These tests require further adjustments to ensure compatibility with Raft Manager's behavior. --- .../storagemgr/partition/testhelper_test.go | 150 +++++++++++++++++- .../transaction_manager_consumer_test.go | 5 +- .../transaction_manager_hook_test.go | 16 +- .../partition/transaction_manager_test.go | 46 +++++- internal/testhelper/testhelper.go | 24 +++ 5 files changed, 229 insertions(+), 12 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index 1825e17224b..6f4ece9124b 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -10,11 +10,14 @@ import ( "os" "path/filepath" "reflect" + "regexp" "sort" + "strconv" "strings" "sync" "testing" + "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" @@ -36,10 +39,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" + logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" ) func TestMain(m *testing.M) { @@ -672,14 +678,30 @@ func RequireDatabase(tb testing.TB, ctx context.Context, database keyvalue.Trans // Unmarshal the actual value to the same type as the expected value. actualValue := reflect.New(reflect.TypeOf(expectedValue).Elem()).Interface().(proto.Message) - require.NoError(tb, proto.Unmarshal(value, actualValue)) + require.NoErrorf(tb, proto.Unmarshal(value, actualValue), "unmarshalling value in db: %q", key) + + if _, isAny := expectedValue.(*anypb.Any); isAny { + // Verify if the database contains the key, but the content does not matter. + actualState[string(key)] = expectedValue + continue + } actualState[string(key)] = actualValue } return nil })) - require.Empty(tb, unexpectedKeys, "database contains unexpected keys") + require.Emptyf(tb, unexpectedKeys, "database contains unexpected keys") + + // Ignore optional keys in expectedState but not in actualState + for k, v := range expectedState { + if _, isAny := v.(*anypb.Any); isAny { + if _, exist := actualState[k]; !exist { + delete(expectedState, k) + } + } + } + testhelper.ProtoEqual(tb, expectedState, actualState) } @@ -960,6 +982,8 @@ type RepositoryAssertion struct { type StateAssertion struct { // Database is the expected state of the database. Database DatabaseState + // NotOffsetDatabaseInRaft indicates if the LSN in the database should not be shifted. + NotOffsetDatabaseInRaft bool // Directory is the expected state of the manager's state directory in the repository. Directory testhelper.DirectoryState // Repositories is the expected state of the repositories in the storage. The key is @@ -1047,11 +1071,27 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas return NewMetrics(housekeeping.NewMetrics(setup.Config.Prometheus)) } + var raftFactory raftmgr.RaftManagerFactory + var raftEntryRecorder *raftmgr.EntryRecorder + clusterID := uuid.New().String() + if testhelper.IsRaftEnabled() { + raftEntryRecorder = raftmgr.NewEntryRecorder() + setup.Config.Raft = config.DefaultRaftConfig(clusterID) + // Speed up initial election overhead in the test setup + setup.Config.Raft.ElectionTicks = 5 + setup.Config.Raft.RTTMilliseconds = 100 + setup.Config.Raft.SnapshotDir = testhelper.TempDir(t) + + raftFactory = func(storageName string, partitionID storage.PartitionID, raftStorage *raftmgr.Storage, logger logging.Logger) (*raftmgr.Manager, error) { + return raftmgr.NewManager(storageName, partitionID, setup.Config.Raft, raftStorage, logger, raftmgr.WithEntryRecorder(raftEntryRecorder)) + } + } + var ( // managerRunning tracks whether the manager is running or closed. managerRunning bool // factory is the factory that produces the current TransactionManager - factory = NewFactory(setup.CommandFactory, setup.RepositoryFactory, newMetrics(), setup.Consumer, setup.Config.Raft, nil) + factory = NewFactory(setup.CommandFactory, setup.RepositoryFactory, newMetrics(), setup.Consumer, setup.Config.Raft, raftFactory) // transactionManager is the current TransactionManager instance. transactionManager = factory.New(logger, setup.PartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) // managerErr is used for synchronizing manager closing and returning @@ -1103,7 +1143,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) transactionManager = factory.New(logger, setup.PartitionID, database, storageName, storagePath, stateDir, stagingDir).(*TransactionManager) - installHooks(transactionManager, &inflightTransactions, step.Hooks) + installHooks(transactionManager, &inflightTransactions, step.Hooks, raftEntryRecorder) go func() { defer func() { @@ -1145,7 +1185,11 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.NoError(t, err) tx := transaction.(*Transaction) - require.Equalf(t, step.ExpectedSnapshotLSN, tx.SnapshotLSN(), "mismatched ExpectedSnapshotLSN") + expectedSnapshotLSN := step.ExpectedSnapshotLSN + if testhelper.IsRaftEnabled() { + expectedSnapshotLSN = raftEntryRecorder.Offset(expectedSnapshotLSN) + } + require.Equalf(t, expectedSnapshotLSN, tx.SnapshotLSN(), "mismatched ExpectedSnapshotLSN") require.NotEmpty(t, tx.Root(), "empty Root") require.Contains(t, tx.Root(), transactionManager.snapshotsDir()) @@ -1420,7 +1464,11 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction := openTransactions[step.TransactionID] transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: - require.NoError(t, transactionManager.logManager.AcknowledgePosition(log.ConsumerPosition, step.LSN)) + lsn := step.LSN + if testhelper.IsRaftEnabled() { + lsn = raftEntryRecorder.Offset(lsn) + } + require.NoError(t, transactionManager.logManager.AcknowledgePosition(log.ConsumerPosition, lsn)) case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] @@ -1487,6 +1535,36 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas require.NoError(t, err) } + // If singular Raft cluster is enabled, the Raft handler starts to insert raft log entries. We need to offset + // the expected LSN to account for LSN shifting. As the order of insertion is not deterministic, the LSN + // shifting depends on a Raft event recorder. + // + // Without Raft: + // Entry 1 (update A) -> Entry 2 (update B) + // + // With Raft: + // Entry 1 (ConfChange) -> Entry 2 (Empty verification entry) -> Entry 3 (update A) -> Entry 4 (update B) + if testhelper.IsRaftEnabled() && raftEntryRecorder.Len() > 0 { + if !tc.expectedState.NotOffsetDatabaseInRaft { + appliedLSN, exist := tc.expectedState.Database[string(keyAppliedLSN)] + if exist { + // If expected applied LSN is present, offset expected LSN. + appliedLSN := appliedLSN.(*gitalypb.LSN) + tc.expectedState.Database[string(keyAppliedLSN)] = raftEntryRecorder.Offset(storage.LSN(appliedLSN.GetValue())).ToProto() + } else { + // Otherwise, the test expects no applied log entry in cases such as invalid transactions. + // Regardless, raft log entries are applied successfully. + if tc.expectedState.Database == nil { + tc.expectedState.Database = DatabaseState{} + } + tc.expectedState.Database[string(keyAppliedLSN)] = raftEntryRecorder.Latest().ToProto() + } + } + // Those are raft-specific keys. They should not be a concern of TransactionManager tests. + for _, key := range raftmgr.RaftDBKeys { + tc.expectedState.Database[string(key)] = &anypb.Any{} + } + } RequireDatabase(t, ctx, database, tc.expectedState.Database) expectedRepositories := tc.expectedState.Repositories @@ -1538,6 +1616,8 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas } } + expectedDirectory = modifyDirectoryStateForRaft(t, expectedDirectory, transactionManager, raftEntryRecorder) + testhelper.RequireDirectoryState(t, stateDir, "", expectedDirectory) expectedStagingDirState := testhelper.DirectoryState{ @@ -1555,6 +1635,64 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas testhelper.RequireDirectoryState(t, transactionManager.stagingDirectory, "", expectedStagingDirState) } +// modifyDirectoryStateForRaft modifies log entry directory. It does three major things: +// - Shift expected log entry paths to the corresponding location. The LSN of normal entries are affected by Raft's +// internal entries. +// - Insert RAFT metadata files. +// - Insert Raft's internal log entries if they are not pruned away. +func modifyDirectoryStateForRaft(t *testing.T, expectedDirectory testhelper.DirectoryState, tm *TransactionManager, recorder *raftmgr.EntryRecorder) testhelper.DirectoryState { + if !testhelper.IsRaftEnabled() { + return expectedDirectory + } + + newExpectedDirectory := testhelper.DirectoryState{} + lsnRegex := regexp.MustCompile(`/wal/(\d+)\/?.*`) + for path, state := range expectedDirectory { + matches := lsnRegex.FindStringSubmatch(path) + if len(matches) == 0 { + // If no LSN found, retain the original entry + newExpectedDirectory[path] = state + continue + } + // Extract and offset the LSN + lsnStr := matches[1] + lsn, err := strconv.ParseUint(lsnStr, 10, 64) + require.NoError(t, err) + + offsetLSN := recorder.Offset(storage.LSN(lsn)) + offsetPath := strings.Replace(path, fmt.Sprintf("/%s", lsnStr), fmt.Sprintf("/%s", offsetLSN.String()), 1) + newExpectedDirectory[offsetPath] = state + newExpectedDirectory[fmt.Sprintf("/wal/%s/RAFT", offsetLSN)] = testhelper.DirectoryEntry{ + Mode: mode.File, + Content: "anything", + ParseContent: func(tb testing.TB, path string, content []byte) any { + // The test should not be concerned with the actual content of RAFT file. + return "anything" + }, + } + } + + // Insert Raft-specific log entries into the new directory structure + raftEntries := recorder.FromRaft() + for lsn, raftEntry := range raftEntries { + if lsn >= tm.GetLogReader().LowWaterMark() { + newExpectedDirectory[fmt.Sprintf("/wal/%s", lsn)] = testhelper.DirectoryEntry{Mode: mode.Directory} + newExpectedDirectory[fmt.Sprintf("/wal/%s/MANIFEST", lsn)] = manifestDirectoryEntry(raftEntry) + newExpectedDirectory[fmt.Sprintf("/wal/%s/RAFT", lsn)] = testhelper.DirectoryEntry{ + Mode: mode.File, + Content: "anything", + ParseContent: func(tb testing.TB, path string, content []byte) any { + // The test should not be concerned with the actual content of RAFT file. + return "anything" + }, + } + } + } + + // Update expected directory + return newExpectedDirectory +} + func checkManagerError(t *testing.T, ctx context.Context, managerErrChannel chan error, mgr *TransactionManager) (bool, error) { t.Helper() diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go index e29c0d51cf4..0aad4c28496 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_consumer_test.go @@ -462,7 +462,10 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti }, }, { - desc: "stopped manager does not prune acknowledged entry", + desc: "stopped manager does not prune acknowledged entry", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, "asserting pruning of internal log entries in this case is not relibale") + }, customSetup: customSetup, steps: steps{ StartManager{}, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index 529afda61dc..0915106430b 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -8,6 +8,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/refdb" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" ) @@ -22,10 +23,18 @@ type hookContext struct { closeManager func() // lsn stores the LSN context when the hook is triggered. lsn storage.LSN + // raftEntryRecorder normal and Raft-specific entries that are inserted into the WAL. It is used to filter out entries + // injected by Raft in hook executions. + raftEntryRecorder *raftmgr.EntryRecorder } // installHooks takes the hooks in the test setup and configures them in the TransactionManager. -func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, hooks testTransactionHooks) { +func installHooks( + mgr *TransactionManager, + inflightTransactions *sync.WaitGroup, + hooks testTransactionHooks, + raftEntryRecorder *raftmgr.EntryRecorder, +) { for destination, source := range map[*func()]hookFunc{ &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, &mgr.testHooks.beforeRunExiting: func(hookContext) { @@ -56,8 +65,9 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, runHook := source *destination = func(lsn storage.LSN) { runHook(hookContext{ - closeManager: mgr.Close, - lsn: lsn, + closeManager: mgr.Close, + lsn: lsn, + raftEntryRecorder: raftEntryRecorder, }) } } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index 00b8d6ad713..57ff1158538 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -43,8 +43,16 @@ var errSimulatedCrash = errors.New("simulated crash") // simulateCrashHook returns a hook function that panics with errSimulatedCrash. var simulateCrashHook = func() func(hookContext) { - return func(hookContext) { - panic(errSimulatedCrash) + return func(c hookContext) { + if !testhelper.IsRaftEnabled() { + // Always panic if Raft is not enabled + panic(errSimulatedCrash) + } else if c.raftEntryRecorder == nil { + // Some hooks don't include Raft entry recorder. + panic(errSimulatedCrash) + } else if !c.raftEntryRecorder.IsFromRaft(c.lsn) { + panic(errSimulatedCrash) + } } } @@ -491,6 +499,11 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio }(), { desc: "commit returns if transaction processing stops before transaction acceptance", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `The hook is installed before appending log entry, before + recorder is activated. Hence, it's not feasible to differentiate between normal + entries and Raft internal entries`) + }, steps: steps{ StartManager{ Hooks: testTransactionHooks{ @@ -532,6 +545,15 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio }, }, expectedState: StateAssertion{ + Database: testhelper.WithOrWithoutRaft( + // The process crashes before apply but after it's committed. So, only + // committedLSN is persisted. + DatabaseState{ + string(keyAppliedLSN): storage.LSN(2).ToProto(), + }, + DatabaseState{}, + ), + NotOffsetDatabaseInRaft: true, Directory: gittest.FilesOrReftables(testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -1459,6 +1481,10 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio ExpectedError: errSimulatedCrash, }, }, + expectedState: StateAssertion{ + Database: DatabaseState{}, + NotOffsetDatabaseInRaft: true, + }, }, { desc: "transaction rollbacked after already being rollbacked", @@ -1755,6 +1781,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t return []transactionTestCase{ { desc: "manager has just initialized", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { @@ -1764,6 +1793,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, { desc: "a transaction has one reader", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, Begin{ @@ -1872,6 +1904,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, { desc: "a transaction has multiple readers", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, Begin{ @@ -2037,6 +2072,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, { desc: "committed read-only transaction are not kept", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, Begin{ @@ -2073,6 +2111,9 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, { desc: "transaction manager cleans up left-over committed entries when appliedLSN == appendedLSN", + skip: func(t *testing.T) { + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) + }, steps: steps{ StartManager{}, Begin{ @@ -2202,6 +2243,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t desc: "transaction manager cleans up left-over committed entries when appliedLSN < appendedLSN", skip: func(t *testing.T) { testhelper.SkipWithReftable(t, "test requires manual log addition") + testhelper.SkipWithRaft(t, `this test assert internal state of TransactionManager, which is hard to interfere`) }, steps: steps{ StartManager{}, diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index d1d51e3f4f6..9130e142f14 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -57,6 +57,30 @@ func IsWALEnabled() bool { return ok } +// IsRaftEnabled returns whether Raft single cluster is enabled in this testing run. +func IsRaftEnabled() bool { + _, ok := os.LookupEnv("GITALY_TEST_RAFT") + if ok && !IsWALEnabled() { + panic("GITALY_TEST_WAL must be enabled") + } + return ok +} + +// WithOrWithoutRaft returns a value correspondingly to if Raft is enabled or not. +func WithOrWithoutRaft[T any](raftVal, noRaftVal T) T { + if IsRaftEnabled() { + return raftVal + } + return noRaftVal +} + +// SkipWithRaft skips the test if Raft is enabled in this testing run. +func SkipWithRaft(tb testing.TB, reason string) { + if IsRaftEnabled() { + tb.Skip(reason) + } +} + // SkipWithWAL skips the test if write-ahead logging is enabled in this testing run. A reason // should be provided either as a description or a link to an issue to explain why the test is // skipped. -- GitLab From 6005c75d09428fe4751d1c9d696d03615dff5b3a Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 9 Jan 2025 17:43:01 +0700 Subject: [PATCH 09/10] raft: Add test-raft job to the test matrices This commit creates new test target. This test target initiates Raft factory and adds it to gRPC server dependency. Along the way, some tests are skipped because there are no reliable ways to assert the results when Raft is enabled. --- .gitlab-ci.yml | 8 ++++---- Makefile | 6 ++++++ internal/backup/partition_backup_test.go | 4 ++++ internal/cli/gitaly/subcmd_recovery_test.go | 4 ++++ internal/cli/gitalybackup/partition_test.go | 4 ++++ .../service/partition/backup_partition_test.go | 6 ++++++ .../service/partition/list_partitions_test.go | 3 +++ internal/testhelper/testcfg/gitaly.go | 9 +++++++++ internal/testhelper/testserver/gitaly.go | 14 +++++++++++++- 9 files changed, 53 insertions(+), 5 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 11a8713663d..9b867f4981c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -257,7 +257,7 @@ test: TEST_TARGET: test USE_MESON: YesPlease - TEST_TARGET: - [test-with-praefect, race-go, test-wal, test-with-praefect-wal] + [test-with-praefect, race-go, test-wal, test-raft, test-with-praefect-wal] # We also verify that things work as expected with a non-bundled Git # version matching our minimum required Git version. - TEST_TARGET: test @@ -313,7 +313,7 @@ test:nightly: parallel: matrix: - GIT_VERSION: ["master", "next"] - TEST_TARGET: [test, test-with-praefect, test-with-reftable, test-with-sha256] + TEST_TARGET: [test, test-with-praefect, test-with-reftable, test-with-sha256, test-wal, test-raft] rules: - if: '$CI_PIPELINE_SOURCE == "schedule"' allow_failure: false @@ -332,7 +332,7 @@ test:sha256: parallel: matrix: - TEST_TARGET: - [test, test-with-praefect, test-wal, test-with-praefect-wal] + [test, test-with-praefect, test-wal, test-raft, test-with-praefect-wal] TEST_WITH_SHA256: "YesPlease" test:fips: @@ -355,7 +355,7 @@ test:fips: - test "$(cat /proc/sys/crypto/fips_enabled)" = "1" || (echo "System is not running in FIPS mode" && exit 1) parallel: matrix: - - TEST_TARGET: [test, test-with-praefect] + - TEST_TARGET: [test, test-with-praefect, test-wal, test-raft] FIPS_MODE: "YesPlease" GO_VERSION: !reference [.versions, go_supported] rules: diff --git a/Makefile b/Makefile index e56a4cc21a1..b30b656d411 100644 --- a/Makefile +++ b/Makefile @@ -440,6 +440,12 @@ test-with-praefect: test-go test-wal: export GITALY_TEST_WAL = YesPlease test-wal: test-go +.PHONY: test-raft +## Run Go tests with write-ahead logging + Raft enabled. +test-raft: export GITALY_TEST_WAL = YesPlease +test-raft: export GITALY_TEST_RAFT = YesPlease +test-raft: test-go + .PHONY: test-with-praefect-wal ## Run Go tests with write-ahead logging and Praefect enabled. test-with-praefect-wal: export GITALY_TEST_WAL = YesPlease diff --git a/internal/backup/partition_backup_test.go b/internal/backup/partition_backup_test.go index 38ca581b112..917334df6f3 100644 --- a/internal/backup/partition_backup_test.go +++ b/internal/backup/partition_backup_test.go @@ -114,6 +114,10 @@ func TestPartitionBackup_CreateSuccess(t *testing.T) { require.NoError(t, err) + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) + for _, expectedArchive := range tc.expectedArchives { tarPath := filepath.Join(backupRoot, "partition-backups", cfg.Storages[0].Name, expectedArchive, storage.LSN(1).String()) + ".tar" tar, err := os.Open(tarPath) diff --git a/internal/cli/gitaly/subcmd_recovery_test.go b/internal/cli/gitaly/subcmd_recovery_test.go index b43467acf93..28d8422a133 100644 --- a/internal/cli/gitaly/subcmd_recovery_test.go +++ b/internal/cli/gitaly/subcmd_recovery_test.go @@ -53,6 +53,8 @@ type setupData struct { func TestRecoveryCLI_status(t *testing.T) { t.Parallel() + testhelper.SkipWithRaft(t, "Raft must not be enabled during recovery") + for _, tc := range []struct { desc string setup func(tb testing.TB, ctx context.Context, opts setupOptions) setupData @@ -379,6 +381,8 @@ Available WAL backup entries: up to LSN: %s`, func TestRecoveryCLI_replay(t *testing.T) { t.Parallel() + testhelper.SkipWithRaft(t, "Raft must not be enabled during recovery") + for _, tc := range []struct { desc string setup func(tb testing.TB, ctx context.Context, opts setupOptions) setupData diff --git a/internal/cli/gitalybackup/partition_test.go b/internal/cli/gitalybackup/partition_test.go index 615ec856618..737837dab91 100644 --- a/internal/cli/gitalybackup/partition_test.go +++ b/internal/cli/gitalybackup/partition_test.go @@ -110,6 +110,10 @@ func TestPartitionSubcommand_Create(t *testing.T) { } require.NoError(t, err) + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) + lsn := storage.LSN(1) tarPath := filepath.Join(path, "partition-backups", cfg.Storages[0].Name, "2", lsn.String()) + ".tar" tar, err := os.Open(tarPath) diff --git a/internal/gitaly/service/partition/backup_partition_test.go b/internal/gitaly/service/partition/backup_partition_test.go index 564180929ab..6f2174229c1 100644 --- a/internal/gitaly/service/partition/backup_partition_test.go +++ b/internal/gitaly/service/partition/backup_partition_test.go @@ -30,6 +30,9 @@ func TestBackupPartition(t *testing.T) { if testhelper.IsPraefectEnabled() { t.Skip(`Praefect currently doesn't support routing the PARTITION scoped RPC messages.`) } + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) type setupData struct { cfg config.Cfg @@ -251,6 +254,9 @@ func TestBackupPartition_BackupExists(t *testing.T) { if testhelper.IsPraefectEnabled() { t.Skip(`Praefect currently doesn't support routing the PARTITION scoped RPC messages.`) } + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) ctx := testhelper.Context(t) diff --git a/internal/gitaly/service/partition/list_partitions_test.go b/internal/gitaly/service/partition/list_partitions_test.go index e34605c2d5b..76d0af24cfc 100644 --- a/internal/gitaly/service/partition/list_partitions_test.go +++ b/internal/gitaly/service/partition/list_partitions_test.go @@ -19,6 +19,9 @@ func TestListPartitions(t *testing.T) { t.Skip(`Since it is not guaranteed that all the gitaly instances for a given Praefect will have the same partition IDs, this RPC will not be supported for Praefect`) } + testhelper.SkipWithRaft(t, `The test asserts the existence of backup files based on the latest + LSN. When Raft is not enabled, the LSN is not static. The test should fetch the latest + LSN instead https://gitlab.com/gitlab-org/gitaly/-/issues/6459`) ctx := testhelper.Context(t) cfg, ptnClient, repoClient := setupServices(t) diff --git a/internal/testhelper/testcfg/gitaly.go b/internal/testhelper/testcfg/gitaly.go index 726092838af..e8ab8c16fe7 100644 --- a/internal/testhelper/testcfg/gitaly.go +++ b/internal/testhelper/testcfg/gitaly.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/pelletier/go-toml/v2" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" @@ -172,6 +173,14 @@ func (gc *GitalyCfgBuilder) Build(tb testing.TB) config.Cfg { } cfg.Transactions.Enabled = testhelper.IsWALEnabled() + if testhelper.IsRaftEnabled() && !testhelper.IsPraefectEnabled() { + cfg.Transactions.Enabled = true + cfg.Raft = config.DefaultRaftConfig(uuid.New().String()) + // Speed up initial election overhead in the test setup + cfg.Raft.ElectionTicks = 5 + cfg.Raft.RTTMilliseconds = 100 + cfg.Raft.SnapshotDir = testhelper.TempDir(tb) + } // We force the use of bundled (embedded) binaries unless we're specifically executing tests // against a custom version of Git. diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index ad9a6ca3a7c..75f1493cdaf 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -6,6 +6,7 @@ import ( "os" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth" @@ -30,6 +31,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/raftmgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" @@ -358,6 +360,16 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte require.NoError(tb, err) tb.Cleanup(dbMgr.Close) + var raftFactory raftmgr.RaftManagerFactory + if testhelper.IsRaftEnabled() && !testhelper.IsPraefectEnabled() { + cfg.Raft = config.DefaultRaftConfig(uuid.New().String()) + // Speed up initial election overhead in the test setup + cfg.Raft.ElectionTicks = 5 + cfg.Raft.RTTMilliseconds = 100 + cfg.Raft.SnapshotDir = testhelper.TempDir(tb) + raftFactory = raftmgr.DefaultFactory(cfg.Raft) + } + nodeMgr, err := nodeimpl.NewManager( cfg.Storages, storagemgr.NewFactory( @@ -370,7 +382,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), nil, cfg.Raft, - nil, + raftFactory, ), migration.NewMetrics(), migrations, -- GitLab From 3ec525b53c408ef787a7d5436e38aa85d38d33a3 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 28 Feb 2025 12:13:29 +0700 Subject: [PATCH 10/10] raft: Backfill missing fysnc for internal entries When Raft manager handles internal log entries issued by etcd/raft, it creates corresponding WAL entries without going through the TransactionManager. The most crucial part is the MANIFEST file. At the comment, Raft storage doesn't issue a fsync command on the file. This command is crucial to ensure that the content of the file is flushed to the disk. Without this command, the file might be damaged and unreadable later. --- internal/gitaly/storage/raftmgr/storage.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/gitaly/storage/raftmgr/storage.go b/internal/gitaly/storage/raftmgr/storage.go index 73dd55b38a8..cc46b24b378 100644 --- a/internal/gitaly/storage/raftmgr/storage.go +++ b/internal/gitaly/storage/raftmgr/storage.go @@ -496,6 +496,12 @@ func (s *Storage) draftLogEntry(raftEntry raftpb.Entry, callback func(*wal.Entry }); err != nil { return fmt.Errorf("writing manifest file: %w", err) } + // The fsync is essential to flush the content of the manifest file itself. We also need to fsync the parent to + // ensure the creation of the file is flushed. That part will be covered in insertLogEntry after the Raft + // artifact file is created. + if err := safe.NewSyncer().Sync(s.ctx, wal.ManifestPath(walEntry.Directory())); err != nil { + return fmt.Errorf("sync raft manifest file: %w", err) + } // Finally, insert it to WAL. return s.insertLogEntry(raftEntry, logEntryPath) -- GitLab