From edec4a8fdd3e54f31b71fcf7073c45f14af41615 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 13 Jan 2025 20:28:17 +0700 Subject: [PATCH 1/3] storage: Add DeleteLogEntriesFrom for trailing log removal In Raft, when log entries with a higher term need to replace previously appended entries, it is necessary to efficiently remove the tail of the WAL. This commit introduces a new method, DeleteLogEntriesFrom, that deletes all log entries starting from a given LSN (inclusive) to the end of the log. The method ensures: - No deletion is performed if a consumer is present, preventing premature notifications to an active consumer. - Deletion is refused if the requested starting LSN is below the low water mark, thereby avoiding removal of entries still in use. - Multiple trailing entries are removed efficiently by renaming them to a "to_delete" folder, with only one fsync after the batch operation. - Proper concurrency is maintained by acquiring both the main and pruning mutexes, ensuring that no conflicting operations occur during deletion. --- .../storagemgr/partition/log/log_manager.go | 57 ++++++ .../partition/log/log_manager_test.go | 181 ++++++++++++++++++ 2 files changed, 238 insertions(+) diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go index f4db452c98f..ceda60647ab 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager.go @@ -403,6 +403,63 @@ func (mgr *Manager) DeleteLogEntry(lsn storage.LSN) error { return nil } +// DeleteTrailingLogEntries removes all log entries starting from the given LSN (inclusive) until the end of the WAL. It +// refuses to delete entries if the requested starting LSN is below the low water mark. +func (mgr *Manager) DeleteTrailingLogEntries(from storage.LSN) (returnedErr error) { + // Use the LowWaterMark() helper. If the requested LSN is below the low water mark, do not proceed. + lowWaterMark := mgr.LowWaterMark() + if from < lowWaterMark { + return fmt.Errorf("requested LSN is below the low water mark") + } + + // Acquire the pruning mutex first to ensure no concurrent pruning activity + mgr.pruningMutex.Lock() + defer mgr.pruningMutex.Unlock() + + // Protect critical state with the main mutex. + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + // If the starting LSN is above the last appended log entry, there's nothing to delete. + if from > mgr.appendedLSN { + return nil + } + + // Determine the WAL directory path. + walDir := StatePath(mgr.stateDirectory) + + // Create a temporary directory to batch-remove log entries. + tmpDir, err := os.MkdirTemp(mgr.tmpDirectory, "") + if err != nil { + return fmt.Errorf("failed to create temporary directory: %w", err) + } + defer func() { + // Remove the temporary directory. + if err := os.RemoveAll(tmpDir); err != nil { + returnedErr = errors.Join(returnedErr, fmt.Errorf("remove files: %w", err)) + } + }() + + // Instead of calling DeleteLogEntry (which triggers an fsync per deletion), + // move each target log entry from 'from' up to the appendedLSN into the temporary directory. + for lsn := mgr.appendedLSN; lsn >= from; lsn-- { + source := EntryPath(mgr.stateDirectory, lsn) + destination := filepath.Join(tmpDir, fmt.Sprintf("%s.%s", lsn.String(), "to_delete")) + if err := os.Rename(source, destination); err != nil { + return fmt.Errorf("rename: %w", err) + } + // Lower the appendedLSN gradually to prevent partial failure. + mgr.appendedLSN = lsn - 1 + } + + // Perform one fsync on the WAL's parent directory to persist changes. + if err := safe.NewSyncer().SyncParent(mgr.ctx, walDir); err != nil { + return fmt.Errorf("failed to sync state directory: %w", err) + } + + return nil +} + // LowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than // this mark are removed. func (mgr *Manager) LowWaterMark() storage.LSN { diff --git a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go index 5e9af7a0b76..aae61b65b0b 100644 --- a/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/log/log_manager_test.go @@ -1142,3 +1142,184 @@ func TestLogManager_NotifyNewEntries(t *testing.T) { } }) } + +func TestLogManager_DeleteTrailingLogEntries(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + t.Run("reject deletion if requested LSN is below low water mark", func(t *testing.T) { + // Setup a log manager without a consumer. + logManager := setupLogManager(t, ctx, nil) + + // Append two log entries. + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) + + // Acknowledge the first entry as applied so that LowWaterMark() becomes 2. + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 1)) + waitUntilPruningFinish(t, logManager) + require.Equal(t, storage.LSN(2), logManager.LowWaterMark()) + + // Attempt deletion starting from LSN 1 (which is below the low water mark). + err := logManager.DeleteTrailingLogEntries(1) + require.Error(t, err) + require.Contains(t, err.Error(), "requested LSN is below the low water mark") + + // This method does not delete any log entries. Log entry 1 was deleted by the background + // pruning task, though. + require.Equal(t, storage.LSN(2), logManager.AppendedLSN()) + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": { + Mode: mode.Directory, + }, + "/wal": { + Mode: mode.Directory, + }, + "/wal/0000000000002": { + Mode: mode.Directory, + }, + "/wal/0000000000002/1": { + Mode: mode.File, + Content: []byte("content-2"), + }, + }) + }) + + t.Run("successfully delete tail entries", func(t *testing.T) { + // Setup a log manager without a consumer. + logManager := setupLogManager(t, ctx, nil) + + // Append four log entries. + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) // LSN 1 + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) // LSN 2 + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) // LSN 3 + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-4")}) // LSN 4 + + // Acknowledge the first two entries as applied. + // That makes the low water mark = 2 + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 1)) + waitUntilPruningFinish(t, logManager) + require.Equal(t, storage.LSN(2), logManager.LowWaterMark()) + + // Delete entries starting from LSN 3 (which is >= low water mark). + err := logManager.DeleteTrailingLogEntries(3) + require.NoError(t, err) + + // appendedLSN should now be lowered to one before the 'from' LSN. + require.Equal(t, storage.LSN(2), logManager.AppendedLSN()) + + // Assert that only LSN 2 remain in the on-disk WAL. 1 was removed by background pruning task. + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": { + Mode: mode.Directory, + }, + "/wal": { + Mode: mode.Directory, + }, + "/wal/0000000000002": { + Mode: mode.Directory, + }, + "/wal/0000000000002/1": { + Mode: mode.File, + Content: []byte("content-2"), + }, + }) + }) + + t.Run("requested LSN above appended LSN (nothing to delete)", func(t *testing.T) { + // Setup a log manager without a consumer. + logManager := setupLogManager(t, ctx, nil) + + // Append two log entries. + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-A")}) // LSN 1 + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-B")}) // LSN 2 + + // Call DeleteLogEntriesFrom with a LSN greater than the current appendedLSN. + err := logManager.DeleteTrailingLogEntries(5) + require.NoError(t, err) + + // appendedLSN should remain unchanged. + require.Equal(t, storage.LSN(2), logManager.appendedLSN) + + // Assert that the WAL still contains both log entries. + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": { + Mode: mode.Directory, + }, + "/wal": { + Mode: mode.Directory, + }, + "/wal/0000000000001": { + Mode: mode.Directory, + }, + "/wal/0000000000001/1": { + Mode: mode.File, + Content: []byte("content-A"), + }, + "/wal/0000000000002": { + Mode: mode.Directory, + }, + "/wal/0000000000002/1": { + Mode: mode.File, + Content: []byte("content-B"), + }, + }) + }) + + t.Run("concurrent deletion invocation", func(t *testing.T) { + // Setup a log manager without a consumer. + logManager := setupLogManager(t, ctx, nil) + + // Append four log entries. + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-1")}) // LSN 1 + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-2")}) // LSN 2 + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-3")}) // LSN 3 + appendLogEntry(t, logManager, map[string][]byte{"1": []byte("content-4")}) // LSN 4 + + // Acknowledge LSN 1 as applied so that low water mark = 2. + require.NoError(t, logManager.AcknowledgePosition(AppliedPosition, 1)) + waitUntilPruningFinish(t, logManager) + require.Equal(t, storage.LSN(2), logManager.LowWaterMark()) + + // Launch concurrent deletion calls. + const numRoutines = 3 + var wg sync.WaitGroup + errCh := make(chan error, numRoutines) + + for i := 1; i <= numRoutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + // All routines try deleting from LSN 3. + errCh <- logManager.DeleteTrailingLogEntries(3) + }() + } + wg.Wait() + close(errCh) + + // Ensure that all calls returned without error. + for err := range errCh { + require.NoError(t, err) + } + + // appendedLSN should now be 2 because deletion starting at LSN 3 should remove tail entries. + require.Equal(t, storage.LSN(2), logManager.AppendedLSN()) + + // Assert that only entries LSN 2. + assertDirectoryState(t, logManager, testhelper.DirectoryState{ + "/": { + Mode: mode.Directory, + }, + "/wal": { + Mode: mode.Directory, + }, + "/wal/0000000000002": { + Mode: mode.Directory, + }, + "/wal/0000000000002/1": { + Mode: mode.File, + Content: []byte("content-2"), + }, + }) + }) +} -- GitLab From d1cc9f2cde45e75a6d917e387a0e5784fd3e8753 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 24 Jan 2025 16:51:58 +0700 Subject: [PATCH 2/3] raft: Add doc for Gitaly multi-raft architecture Most important components of raftmgr will be added in some later commits. This commit adds a doc for Gitaly multi-raft architecture to `doc/raft`. This doc will guide the implementation of those components. --- doc/raft.md | 173 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 doc/raft.md diff --git a/doc/raft.md b/doc/raft.md new file mode 100644 index 00000000000..9505ee1d5f4 --- /dev/null +++ b/doc/raft.md @@ -0,0 +1,173 @@ +# Gitaly's Multi-Raft Architecture + +## Overview + +The unit of Git data storage is the **repository**. Repositories in a Gitaly cluster are hosted within **storages**, each +with a globally unique name. A Gitaly server can host multiple storages simultaneously. Downstream services, +particularly GitLab Rails, maintain a mapping between repositories, storages, and the addresses of the servers hosting +them. + +Within each storage, repositories are grouped into **partitions**. A partition may consist of a single repository or a +collection of related repositories, such as those belonging to the same object pool in the case of a fork network. +Write-Ahead Logging (WAL) operates at the partition level, with all repositories in a partition sharing the same +monotonic log sequence number (LSN). Partition log entries are applied sequentially. + +The `raftmgr` package manages each partition in a separate Raft consensus group (Raft group), which operates +independently. Partitions are replicated across multiple storages. Storage is where partitions are placed. If a +storage creates a partition, it will mint its ID, but there's no special relationship between the partition and +storage. A partition can be moved freely to another storage. At any given time, a single Gitaly server can host thousands +or even hundreds of thousands of Raft groups. + +Partitions are replicated across multiple storages. A storage can host both its own partitions and replicas of +partitions from other storages. + +The data size and workload of partitions vary significantly. For example, a partition containing a monorepository and +its forks can dominate a server's resource usage and traffic. As a result, replication factors and routing tables are +designed at the partition level. Each partition can define its own replication constraints, such as locality and +optional storage capacity (e.g., HDD or SSD). + +## Partition Identity and Membership Management + +Each partition in a cluster is identified by a globally unique **Partition ID**. The Partition ID of a partition is generated +once when it is created. Under the hood, the Partition ID is a tuple of `(Authority Name, Local Partition ID)`, in which: + +- Authority Name: Represents the storage that created the partition. +- Local Partition ID: A local, monotonic identifier for the partition within the authority's storage. + +This identity system allows distributed partition creation without the need for a global ID registry. Although the +authority storage name is a part of the Partition ID, it does not imply ownership of the partition. A partition can move +freely to other storages after creation. + +Each partition is a Raft group with one or more members. Raft groups are also identified by the Partition ID due to +the one-to-one relationship. + +The `raftmgr.Manager` oversees all Raft activities for a Raft group member. Internally, etcd/raft assigns an +integer **Node ID** to a Raft group member. The Node ID does not change throughout the life cycle of a group member. + +When a partition is bootstrapped for the first time, the Raft manager initializes the `etcd/raft` state machine, +elects itself as the initial leader, and persists all Raft metadata to persistent storage. Its internal Node ID +is always 1 at this stage, making it a fully functional single-node Raft instance. + +When a new member joins a Raft group, the leader issues a Config Change entry. This entry contains the metadata +of the storage, such as storage name, address, and authentication/authorization info. The new member's Node ID +is assigned the LSN of this log entry, ensuring unambiguous identification of members. As the LSN is monotonic, +Node IDs are never reused even if the storage re-joins later. This Node ID system is not exposed outside the +scope of `etcd/raft` integration. + +Since Gitaly follows a multi-Raft architecture, the Node ID alone is insufficient to precisely locate a +partition or Raft group replica. Therefore, each replica (including the leader) is identified using a +**Replica ID**, which consists of `(Partition ID, Node ID, Replica Storage Name)`. + +To ensure fault tolerance in a quorum-based system, a Raft cluster requires a minimum replication factor of 3. +It can tolerate up to `(n-1)/2` failures. For example, a 3-replica cluster can tolerate 1 failure. + +The cluster maintains a global, eventually consistent **routing table** for lookup and routing purposes. Each entry +in the table includes: + +```plaintext +Partition ID: [ + RelativePath: "@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git" + Replicas: [ + + + + ] + Term: 99 + Index: 80 +} +``` + +The current leader of the Raft group is responsible for updating the respective entry in the routing table. The routing +table is updated only if the replica set changes. Leadership changes are not broadcasted due to the high number of Raft +groups and the high update frequency. The routing table and advertised storage addresses are propagated via the +gossip network. + +The `RelativePath` field in the routing table aims to maintain backward compatibility with GitLab-specific clients. It is +removed after clients fully transition to the new Partition ID system. The `Term` and `Index` fields ensure the order of +updates. Entries of the same Partition ID with either a lower term or index are replaced by newer ones. + +The following chart illustrates a simplified semantic organization of data (not actual data placement) on a Gitaly +server. + +```plaintext +gitaly-server-1344 +|_ storage-a + |_ + |_ repository @hashed/aa/aabbcc...git + |_ repository @hashed/bb/bbccdd...git (fork of @hashed/aa/aabbcc...git) + |_ repository @hashed/cc/ccddee...git (fork of @hashed/aa/aabbcc...git) + |_ (current leader) + |_ repository @hashed/dd/ddeeff...git + |_ + |_ repository @hashed/ee/eeffgg...git + |_ (current leader) + |_ repository @hashed/ff/ffgghh...git + |_ + |_ repository @hashed/gg/gghhii...git + |_ ... +``` + +## Communication Between Members of a Raft Group + +The `etcd/raft` package does not include network communication implementation. Gitaly uses gRPC to transfer messages. +Messages are sent through a single RPC, `RaftService.SendMessage`, which enhances Raft messages with Gitaly's partition +identity metadata. Membership management is facilitated by another RPC (TBD). + +Given that a Gitaly server may host a large number of Raft groups, the "chatty" nature of the Raft protocol can +cause potential issues. Additionally, the protocol is sensitive to health-check failures. To mitigate these challenges, +Gitaly applies techniques such as batching node health checks and quiescing inactive Raft groups. + +## Communication Between Client and Gitaly Cluster (via Proxying) + +TBD + +## Interaction with Transactions and WAL + +The Raft manager implements the `storage.LogManager` interface. By default, the Transaction Manager uses `log.Manager`. +All log entries are appended to the filesystem WAL. Once an entry is persisted, it is ready to be applied by the +Transaction Manager. When Raft is enabled, `log.Manager` is replaced by `raftmgr.Manager` which manages the entire +commit flow, including network communications and quorum acknowledgment. + +The responsibilities of three critical components are: + +- `log.Manager`: Handles local log management. +- `raftmgr.Manager`: Manages distributed log management. +- `partition.TransactionManager`: Manages transactions, concurrency, snapshots, conflicts, and related tasks. + +The flow is illustrated in the following chart: + +```plaintext + ┌──────────┐ + │Local Disk│ + └────▲─────┘ + │ + log.Manager + ▲ +TransactionManager New Transaction │ Without Raft + │ ▼ │ + └─►Initialize─►txn.Commit()─►Verify─► AppendLogEntry()─────►Apply + │ │ + │ │ With Raft + ▼ ▼ + ┌─►Initialize─────...──────────────────Propose + │ │ +raftmgr.Manager etcd/raft state machine + + ┌───────┴────────┐ + ▼ ▼ + raftmgr.Storage raftmgr.Transport + │ │ + ▼ │ + log.Manager Network + │ │ + ┌─────▼────┐ ┌─────▼────┐ + │Local Disk│ │Raft Group│ + └──────────┘ └──────────┘ +``` + +## References + +- [Raft-based decentralized architecture for Gitaly Cluster](https://gitlab.com/groups/gitlab-org/-/epics/8903) +- [In Search of an Understandable Consensus Algorithm](https://raft.github.io/raft.pdf) +- [Consensus: Bridging Theory and Practice](https://github.com/ongardie/dissertation) +- [etcd/raft package documentation](https://pkg.go.dev/go.etcd.io/etcd/raft/v3) -- GitLab From c53a7d592587820333c10c4415a19b33ffcc8094 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 9 Jan 2025 09:33:42 +0700 Subject: [PATCH 3/3] raft: Implement Raft storage component MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A Raft-based cluster requires persistent storage to maintain consensus data across restarts. This commit implements the Raft storage component that integrates with Gitaly's write-ahead while providing additional Raft-specific data persistence. The storage implementation serves several purposes: - Implements etcd/raft's Storage interface for log entry access - Adds persistence of Raft states (hard state, config state) - Tracks and acknowledges committed entries and snapshots - Manages log entry metadata in both WAL and KV store Key design considerations: - Reuses existing WAL infrastructure (log.Manager) for local log entry persistence. - Uses KV store for Raft-specific states. A new internal LSN (committedLSN) was introduced. This index tracks the latest acked LSN by the Raft group. It is used to delete persisted but not acked log entries after a restart. This pruning is important because we don't the primary node to apply non-acked entries. - Extends log entry format to include Raft metadata (term, index) Directory structure for log entries: Without Raft: With Raft: └── MANIFEST └── MANIFEST └── 1 └── RAFT └── 2 └── 1 └── ... └── 2 └── ... --- go.sum | 4 + internal/gitaly/storage/raftmgr/storage.go | 512 +++++++ .../gitaly/storage/raftmgr/storage_test.go | 1177 +++++++++++++++++ 3 files changed, 1693 insertions(+) create mode 100644 internal/gitaly/storage/raftmgr/storage.go create mode 100644 internal/gitaly/storage/raftmgr/storage_test.go diff --git a/go.sum b/go.sum index dad5d565851..3cef3e16fdf 100644 --- a/go.sum +++ b/go.sum @@ -190,6 +190,8 @@ github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnht github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8ETbOasdwEV+avkR75ZzsVV9WI= github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA= +github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/containerd/cgroups/v3 v3.0.5 h1:44na7Ud+VwyE7LIoJ8JTNQOa549a8543BmzaJHo6Bzo= github.com/containerd/cgroups/v3 v3.0.5/go.mod h1:SA5DLYnXO8pTGYiAHXz94qvLQTKfVM5GEVisn4jpins= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= @@ -660,6 +662,8 @@ gitlab.com/gitlab-org/go/reopen v1.0.0 h1:6BujZ0lkkjGIejTUJdNO1w56mN1SI10qcVQyQl gitlab.com/gitlab-org/go/reopen v1.0.0/go.mod h1:D6OID8YJDzEVZNYW02R/Pkj0v8gYFSIhXFTArAsBQw8= gitlab.com/gitlab-org/labkit v1.21.2 h1:GlFHh8OdkrIMH3Qi0ByOzva0fGYXMICsuahGpJe4KNQ= gitlab.com/gitlab-org/labkit v1.21.2/go.mod h1:Q++SWyCH/abH2pytnX2SU/3mrCX6aK/xKz/WpM1hLbA= +go.etcd.io/etcd/client/pkg/v3 v3.5.16 h1:ZgY48uH6UvB+/7R9Yf4x574uCO3jIx0TRDyetSfId3Q= +go.etcd.io/etcd/client/pkg/v3 v3.5.16/go.mod h1:V8acl8pcEK0Y2g19YlOV9m9ssUe6MgiDSobSoaBAM0E= go.etcd.io/etcd/raft/v3 v3.5.16 h1:zBXA3ZUpYs1AwiLGPafYAKKl/CORn/uaxYDwlNwndAk= go.etcd.io/etcd/raft/v3 v3.5.16/go.mod h1:P4UP14AxofMJ/54boWilabqqWoW9eLodl6I5GdGzazI= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= diff --git a/internal/gitaly/storage/raftmgr/storage.go b/internal/gitaly/storage/raftmgr/storage.go new file mode 100644 index 00000000000..f7772dd4529 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/storage.go @@ -0,0 +1,512 @@ +package raftmgr + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/dgraph-io/badger/v4" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" + "gitlab.com/gitlab-org/gitaly/v16/internal/safe" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +var ( + // RaftCommittedPosition tracks the highest log entry known to be committed in the Raft consensus. This position + // is used by the log manager for cleaning up committed entries and maintaining cluster consistency. + RaftCommittedPosition = storage.PositionType{Name: "RaftCommittedPosition", ShouldNotify: false} + + // RaftSnapshotPosition tracks the latest log entry included in a Raft snapshot. This position enables efficient + // log truncation and recovery by indicating which entries can be safely removed from the write-ahead log. + // Currently, this position matches the committed position as auto-compaction is not yet implemented. This will + // change with the implementation of auto-compaction in the following issue: + // https://gitlab.com/gitlab-org/gitaly/-/issues/6463 + RaftSnapshotPosition = storage.PositionType{Name: "RaftSnapshotPosition", ShouldNotify: false} + + // KeyHardState is the database key for storing the member's current Raft hard state. + // This state must be persisted before sending any messages to ensure consistency. + KeyHardState = []byte("raft/hard_state") + + // KeyConfState is the database key for storing the current Raft configuration state. + // This state represents the cluster membership and is used when generating snapshots. + KeyConfState = []byte("raft/conf_state") + + // KeyLastConfigChange denotes the LSN of the last config change entry. + KeyLastConfigChange = []byte("raft/latest_config_change") + + // RaftDBKeys contain the list of Raft-related DB keys. + RaftDBKeys = [][]byte{KeyHardState, KeyConfState, KeyLastConfigChange} +) + +// Storage implements the raft.Storage interface and manages the persistence of Raft state +// in coordination with etcd/raft and log.Manager. +// +// During the lifecycle of etcd/raft, the library requests the application to persist two types of data: +// 1. Log entries to replicate to other members, along with additional Raft-specific metadata +// (e.g., hard state, config state). +// 2. The latest state of the Raft group, such as hard state, config state, snapshot state, etc. +// +// To persist the first type of data, Storage writes an additional RAFT file encapsulating +// this metadata to the staging directory of the target entry. It then coordinates with log.Manager +// to finalize the log appending operation. Later, the etcd/raft library retrieves the log entry and +// its associated Raft metadata. The second type of data is persisted in the key-value database. +// +// Log entry directory without Raft: +// |_ MANIFEST -> gitalypb.LogEntry +// |_ 1 +// |_ 2 +// |_ ... +// +// Log entry directory with Raft: +// |_ MANIFEST -> gitalypb.LogEntry +// |_ RAFT -> raftpb.Entry +// |_ 1 +// |_ 2 +// |_ ... +// +// In the hard state provided by etcd/raft state machine, the committed index (let's called it +// committedLSN) is crucial. It tracks the latest LSN (Log Sequence Number) acknowledged by +// the Raft group's quorum. This index complements the existing appendedLSN managed by log.Manager. +// When a log entry is appended, it goes through two distinct stages: +// 1. Appending to the local WAL (via log.Manager) and receiving an associated LSN. At this stage, +// the log entry is persisted but cannot yet be applied. +// 2. The leader sends the log entry to each member of the Raft group. If the quorum +// acknowledges that they have persisted the entry, the leader marks it as "committed." At this +// point, the entry is ready to be applied by the leader. Followers will apply it after +// receiving the next update from the leader. +// +// With the introduction of committedLSN, Storage proxies certain functionalities of +// log.Manager, particularly log consumption. In log.Manager, the consumer is notified +// immediately after a log entry is appended to the local WAL. However, with Raft, the +// consumer is notified only when the log entry is committed. +/* + ┌─ Last Raft snapshot taken + │ ┌─Consumer not acknowledged + │ │ ┌─ Applied til this point + │ │ │ committedLSN appendedLSN + │ │ │ │ │ +┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ +└─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ + ◄───────────► ◄───────────► + Can remove Need confirmed from quorum + Not ready to be used +*/ +type Storage struct { + ctx context.Context + mutex sync.Mutex + storageName string + partitionID storage.PartitionID + database keyvalue.Transactioner + localLog *log.Manager + committedLSN storage.LSN + lastTerm uint64 + consumer storage.LogConsumer + stagingDir string +} + +// raftManifestPath returns the path to the manifest file within a log entry directory. The manifest file contains +// metadata about the Raft log entry, such as its term, index, the marshaled content of gitalypb.RaftEntry, and so on. +// This file is stored in the log entry directory alongside the MANIFEST file. It is created as part of the log +// appending operation. The etcd/raft library requires the application to persist this metadata to enable later +// retrieval. +func raftManifestPath(logEntryPath string) string { + return filepath.Join(logEntryPath, "RAFT") +} + +// NewStorage creates and initializes a new Storage instance. +func NewStorage( + storageName string, + partitionID storage.PartitionID, + db keyvalue.Transactioner, + stagingDirectory string, + stateDirectory string, + consumer storage.LogConsumer, + positionTracker *log.PositionTracker, +) (*Storage, error) { + if err := positionTracker.Register(RaftCommittedPosition); err != nil { + return nil, fmt.Errorf("registering committed position: %w", err) + } + if err := positionTracker.Register(RaftSnapshotPosition); err != nil { + return nil, fmt.Errorf("registering snapshot position: %w", err) + } + + // Initialize the local log manager without a consumer since notifications + // should only be sent when entries are committed, not when they're appended + localLog := log.NewManager( + storageName, + partitionID, + stagingDirectory, + stateDirectory, + nil, + positionTracker, + ) + + return &Storage{ + database: db, + storageName: storageName, + partitionID: partitionID, + localLog: localLog, + consumer: consumer, + stagingDir: stagingDirectory, + }, nil +} + +// initialize loads all states from DB and disk. It also checks whether the leader has completed its initial bootstrap +// process by verifying the existence of a saved hard state. +func (s *Storage) initialize(ctx context.Context, appliedLSN storage.LSN) (bool, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.ctx = ctx + if err := s.localLog.Initialize(ctx, appliedLSN); err != nil { + return false, fmt.Errorf("initializing local log manager: %w", err) + } + + // Try to load the previous Raft hard state + var hardState raftpb.HardState + if err := s.readKey(KeyHardState, func(value []byte) error { + return hardState.Unmarshal(value) + }); err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + // No previous state exists - this is a fresh installation + return false, nil + } + // If the hard state is never persisted, it means the Raft group is never bootstrapped. + return false, err + } + + // Load the last committed LSN + s.committedLSN = storage.LSN(hardState.Commit) + if s.committedLSN != 0 { + // Update both commit and snapshot positions to match the last committed LSN + if err := s.localLog.AcknowledgePosition(RaftCommittedPosition, s.committedLSN); err != nil { + return false, fmt.Errorf("acknowledging committed position: %w", err) + } + if err := s.localLog.AcknowledgePosition(RaftSnapshotPosition, s.committedLSN); err != nil { + return false, fmt.Errorf("acknowledging committed position: %w", err) + } + + if s.consumer != nil { + s.consumer.NotifyNewEntries(s.storageName, s.partitionID, s.localLog.LowWaterMark(), s.committedLSN) + } + } + s.lastTerm = hardState.Term + + return true, nil +} + +func (s *Storage) close() error { + return s.localLog.Close() +} + +// Entries implements raft.Storage's Entries(). It returns the list of entries which are still managed of range [lo, hi) +func (s *Storage) Entries(lo uint64, hi uint64, maxSize uint64) ([]raftpb.Entry, error) { + firstLSN := uint64(s.localLog.LowWaterMark()) + lastLSN := uint64(s.localLog.AppendedLSN()) + if lo < firstLSN { + return nil, raft.ErrCompacted + } + if firstLSN > lastLSN { + return nil, raft.ErrUnavailable + } + if hi > lastLSN+1 { + return nil, fmt.Errorf("reading out-of-bound entries %d > %d", hi, lastLSN+1) + } + + boundary := hi - 1 + if maxSize != 0 { + boundary = min(lo+maxSize-1, hi-1) + } + + var entries []raftpb.Entry + + for lsn := lo; lsn <= boundary; lsn++ { + entry, err := s.readRaftEntry(storage.LSN(lsn)) + if err != nil { + return nil, fmt.Errorf("reading raft entry: %w", err) + } + entries = append(entries, entry) + } + return entries, nil +} + +// InitialState retrieves the initial Raft HardState and ConfState from persistent storage. It is used to initialize the +// Raft state machine with the previously saved state. +func (s *Storage) InitialState() (raftpb.HardState, raftpb.ConfState, error) { + hardState, err := s.readHardState() + if err != nil { + return raftpb.HardState{}, raftpb.ConfState{}, fmt.Errorf("reading hard state: %w", err) + } + + confState, err := s.readConfState() + if err != nil { + return raftpb.HardState{}, raftpb.ConfState{}, fmt.Errorf("reading conf state: %w", err) + } + + return hardState, confState, nil +} + +// LastIndex returns the last index of all entries currently available in the log. +// This corresponds to the last LSN in the write-ahead log. +func (s *Storage) LastIndex() (uint64, error) { + return uint64(s.localLog.AppendedLSN()), nil +} + +// FirstIndex returns the first index of all entries currently available in the log. +// This corresponds to the first LSN in the write-ahead log. +func (s *Storage) FirstIndex() (uint64, error) { + return uint64(s.localLog.LowWaterMark()), nil +} + +// Snapshot returns the latest snapshot of the state machine. As we haven't supported autocompaction feature, this +// method always returns Unavailable error. +// For more information: https://gitlab.com/gitlab-org/gitaly/-/issues/6463 +func (s *Storage) Snapshot() (raftpb.Snapshot, error) { + return raftpb.Snapshot{}, raft.ErrSnapshotTemporarilyUnavailable +} + +// Term returns the term of the entry at a given index. +func (s *Storage) Term(i uint64) (uint64, error) { + firstLSN := uint64(s.localLog.LowWaterMark()) + lastLSN := uint64(s.localLog.AppendedLSN()) + if i > lastLSN { + return 0, raft.ErrUnavailable + } else if i < firstLSN { + // This also means lastLSN < firstLSN. There are two scenarios that lead to this condition: + // - The WAL is completely empty, likely because the Raft group hasn't been bootstrapped. In this case, + // this method can simply return 0. + // - All log entries have been pruned after a restart. + // + // The second scenario is more complex. The Raft state machine frequently queries the term of the latest + // log entry, especially after etcd/raft's node restarts. In theory, the content of the latest log entry + // must be preserved even after being processed. However, this approach is impractical. It could cause + // inactive partitions to retain log entries indefinitely until new entries are received. + // + // To address this, the term of the last log entry is maintained in memory. Its value is derived from + // the persisted hard state when the Raft manager restarts. After a new entry is persisted, the value is + // updated. + if i == lastLSN { + return s.lastTerm, nil + } + return 0, raft.ErrCompacted + } + + raftEntry, err := s.readRaftEntry(storage.LSN(i)) + if err != nil { + return 0, fmt.Errorf("read log entry term: %w", err) + } + return raftEntry.Term, nil +} + +// setKey marshals and stores a given protocol buffer message into the database under the given key. +func (s *Storage) setKey(key []byte, value []byte) error { + return s.database.Update(func(tx keyvalue.ReadWriter) error { + return tx.Set(key, value) + }) +} + +// readKey reads a key from the database and unmarshals its value in to the destination protocol +// buffer message. +func (s *Storage) readKey(key []byte, unmarshal func([]byte) error) error { + return s.database.View(func(txn keyvalue.ReadWriter) error { + item, err := txn.Get(key) + if err != nil { + return fmt.Errorf("get: %w", err) + } + + return item.Value(unmarshal) + }) +} + +// saveConfState persists the current Raft configuration state to disk, ensuring that configuration changes are durable. +func (s *Storage) saveHardState(hardState raftpb.HardState) error { + marshaled, err := hardState.Marshal() + if err != nil { + return fmt.Errorf("marshaling hard state: %w", err) + } + committedLSN := storage.LSN(hardState.Commit) + + if err := func() error { + s.mutex.Lock() + defer s.mutex.Unlock() + + if committedLSN > s.localLog.AppendedLSN() { + return fmt.Errorf("next committed LSN exceeds appended LSN %d > %d", committedLSN, s.localLog.AppendedLSN()) + } + + if err := s.setKey(KeyHardState, marshaled); err != nil { + return fmt.Errorf("setting hard state key: %w", err) + } + + if err := s.localLog.AcknowledgePosition(RaftCommittedPosition, committedLSN); err != nil { + return fmt.Errorf("acknowledging committed position: %w", err) + } + // Auto-compaction and snapshot are not yet supported. So, the snapshot position will always be the same + // as the committed position. It means the underlying local log manager can prune log entries older than + // the snapshot position. + if err := s.localLog.AcknowledgePosition(RaftSnapshotPosition, committedLSN); err != nil { + return fmt.Errorf("acknowledging snapshot position: %w", err) + } + s.committedLSN = committedLSN + + return nil + }(); err != nil { + return err + } + + if s.consumer != nil { + s.consumer.NotifyNewEntries(s.storageName, s.partitionID, s.localLog.LowWaterMark(), committedLSN) + } + + return nil +} + +func (s *Storage) readHardState() (raftpb.HardState, error) { + var hardState raftpb.HardState + if err := s.readKey(KeyHardState, func(value []byte) error { + return hardState.Unmarshal(value) + }); err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return raftpb.HardState{}, nil + } + return raftpb.HardState{}, err + } + return hardState, nil +} + +// saveConfState persists latest conf state to. It is used when generating snapshot. +func (s *Storage) saveConfState(confState raftpb.ConfState) error { + marshaled, err := confState.Marshal() + if err != nil { + return fmt.Errorf("marshaling conf state: %w", err) + } + return s.setKey(KeyConfState, marshaled) +} + +func (s *Storage) readConfState() (raftpb.ConfState, error) { + var confState raftpb.ConfState + if err := s.readKey(KeyConfState, func(value []byte) error { + return confState.Unmarshal(value) + }); err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return raftpb.ConfState{}, nil + } + return raftpb.ConfState{}, err + } + return confState, nil +} + +// readRaftEntry returns the Raft metadata from the given position in the log. +func (s *Storage) readRaftEntry(lsn storage.LSN) (raftpb.Entry, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + var raftEntry raftpb.Entry + + marshaledBytes, err := os.ReadFile(raftManifestPath(s.localLog.GetEntryPath(lsn))) + if err != nil { + return raftEntry, err + } + + if err := raftEntry.Unmarshal(marshaledBytes); err != nil { + return raftEntry, fmt.Errorf("unmarshal term: %w", err) + } + + return raftEntry, nil +} + +// draftLogEntry drafts a log entry and inserts it to WAL at a certain position. The caller passes a callback function +// for setting the content of the log entry. +func (s *Storage) draftLogEntry(raftEntry raftpb.Entry, callback func(*wal.Entry) error) (returnedErr error) { + // Create a temp directory for drafting log entry. This directory will be moved to the state directory of the + // local log manager. It's only cleaned up if there's an error along the way. + logEntryPath, err := os.MkdirTemp(s.stagingDir, "") + if err != nil { + return fmt.Errorf("mkdir temp: %w", err) + } + defer func() { + if returnedErr != nil { + returnedErr = errors.Join(returnedErr, os.RemoveAll(logEntryPath)) + } + }() + + // Draft a manifest and let the caller sets its content. + walEntry := wal.NewEntry(logEntryPath) + if err := callback(walEntry); err != nil { + return fmt.Errorf("modifying wal entry: %w", err) + } + + // Write the manifest file. + if err := wal.WriteManifest(s.ctx, walEntry.Directory(), &gitalypb.LogEntry{ + Operations: walEntry.Operations(), + }); err != nil { + return fmt.Errorf("writing manifest file: %w", err) + } + + // Finally, insert it to WAL. + return s.insertLogEntry(raftEntry, logEntryPath) +} + +// insertLogEntry inserts a log entry to WAL at a certain position with respective Raft metadata. +func (s *Storage) insertLogEntry(raftEntry raftpb.Entry, logEntryPath string) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + lsn := storage.LSN(raftEntry.Index) + // Although etcd/raft allows inserting log entry at a pre-existing position, it should not be less than the + // committed LSN. Committed entries are properly applied to the persistent storage of this member. Thus, there's + // nothing we can do about that except for rejecting the entry. The Raft protocol should guarantee that this + // situation never happens. + if lsn < s.committedLSN { + return fmt.Errorf("inserted LSN at the point lower than committed LSN") + } + + // It's normal for etcd/raft to request a log entry to be inserted to an existing index in the WAL. + // That can occur when a log entry is not committed by the quorum, due to a network parity for example. The new + // leader will send new log entries with a higher term to all members in the group for acknowledgement. All + // members should then replace obsoleted entries with new ones. All log entries after replaced log entry should + // also be eventually removed. + if lsn <= s.localLog.AppendedLSN() { + if err := s.localLog.DeleteTrailingLogEntries(lsn); err != nil { + return fmt.Errorf("deleting trailing log entries: %w", err) + } + } + + marshaledEntry, err := raftEntry.Marshal() + if err != nil { + return fmt.Errorf("marshaling raft entry: %w", err) + } + + // Finalize the log entry by writing the RAFT file into the log entry's directory. + manifestPath := raftManifestPath(logEntryPath) + if err := os.WriteFile(manifestPath, marshaledEntry, mode.File); err != nil { + return fmt.Errorf("writing raft manifest file: %w", err) + } + if err := safe.NewSyncer().Sync(s.ctx, manifestPath); err != nil { + return fmt.Errorf("sync raft manifest file: %w", err) + } + if err := safe.NewSyncer().SyncParent(s.ctx, manifestPath); err != nil { + return fmt.Errorf("sync raft manifest parent: %w", err) + } + if _, err = s.localLog.CompareAndAppendLogEntry(lsn, logEntryPath); err != nil { + return fmt.Errorf("inserting log entry to WAL: %w", err) + } + s.lastTerm = raftEntry.Term + return nil +} + +func (s *Storage) readLogEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { + return wal.ReadManifest(s.localLog.GetEntryPath(lsn)) +} + +// Compile-time type check. +var _ = (raft.Storage)(&Storage{}) diff --git a/internal/gitaly/storage/raftmgr/storage_test.go b/internal/gitaly/storage/raftmgr/storage_test.go new file mode 100644 index 00000000000..9322bfa0986 --- /dev/null +++ b/internal/gitaly/storage/raftmgr/storage_test.go @@ -0,0 +1,1177 @@ +package raftmgr + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/raftpb" +) + +type mockConsumer struct { + notifications []mockNotification + mutex sync.Mutex +} + +type mockNotification struct { + storageName string + partitionID storage.PartitionID + lowWaterMark storage.LSN + highWaterMark storage.LSN +} + +func (mc *mockConsumer) NotifyNewEntries(storageName string, partitionID storage.PartitionID, lowWaterMark, committedLSN storage.LSN) { + mc.mutex.Lock() + defer mc.mutex.Unlock() + mc.notifications = append(mc.notifications, mockNotification{ + storageName: storageName, + partitionID: partitionID, + lowWaterMark: lowWaterMark, + highWaterMark: committedLSN, + }) +} + +func (mc *mockConsumer) GetNotifications() []mockNotification { + mc.mutex.Lock() + defer mc.mutex.Unlock() + return mc.notifications +} + +func getTestDBManager(t *testing.T, ctx context.Context, cfg config.Cfg) keyvalue.Transactioner { + t.Helper() + + logger := testhelper.NewLogger(t) + + dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) + require.NoError(t, err) + t.Cleanup(dbMgr.Close) + + db, err := dbMgr.GetDB(cfg.Storages[0].Name) + require.NoError(t, err) + + return db +} + +func setupStorage(t *testing.T, ctx context.Context, cfg config.Cfg) *Storage { + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg) + posTracker := log.NewPositionTracker() + rs, err := NewStorage("test-storage", 1, db, stagingDir, stateDir, &mockConsumer{}, posTracker) + require.NoError(t, err) + + bootstrapped, err := rs.initialize(ctx, 0) + require.NoError(t, err) + require.False(t, bootstrapped) + + t.Cleanup(func() { require.NoError(t, rs.close()) }) + return rs +} + +func prepopulateEntries(t *testing.T, ctx context.Context, cfg config.Cfg, stagingDir, stateDir string, num int) { + logManager := log.NewManager(cfg.Storages[0].Name, 1, stagingDir, stateDir, nil, log.NewPositionTracker()) + require.NoError(t, logManager.Initialize(ctx, 0)) + for i := 1; i <= num; i++ { + entryLSN := storage.LSN(i) + entryDir := testhelper.TempDir(t) + _, err := logManager.CompareAndAppendLogEntry(entryLSN, entryDir) + require.NoError(t, err) + } + require.NoError(t, logManager.Close()) +} + +func TestStorage_Initialize(t *testing.T) { + t.Parallel() + + prepopulateStorage := func(t *testing.T, ctx context.Context, cfg config.Cfg, appended int, committed uint64) (keyvalue.Transactioner, string) { + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg) + posTracker := log.NewPositionTracker() + + // Pre-populate n entries + prepopulateEntries(t, ctx, cfg, stagingDir, stateDir, appended) + + rs, err := NewStorage("test-storage", 1, db, stagingDir, stateDir, &mockConsumer{}, posTracker) + require.NoError(t, err) + + _, err = rs.initialize(ctx, 0) + require.NoError(t, err) + // Set on-disk commit LSN to n + require.NoError(t, rs.saveHardState(raftpb.HardState{Term: 2, Vote: 1, Commit: committed})) + require.NoError(t, rs.close()) + + return db, stateDir + } + + t.Run("raft storage is never bootstrapped", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg) + posTracker := log.NewPositionTracker() + + rs, err := NewStorage("test-storage", 1, db, stagingDir, stateDir, &mockConsumer{}, posTracker) + require.NoError(t, err) + + bootstrapped, err := rs.initialize(ctx, 0) + require.NoError(t, err) + require.False(t, bootstrapped, "expected fresh installation (bootstrapped == false)") + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), firstIndex) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(0), lastIndex) + + require.Empty(t, rs.consumer.(*mockConsumer).GetNotifications()) + + require.NoError(t, rs.close()) + }) + + t.Run("raft storage was bootstrapped, no left-over log entries after restart", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + // Simulate a prior session + db, stateDir := prepopulateStorage(t, ctx, cfg, 3, 3) + + // Restart the storage using the same state dir + rs, err := NewStorage("test-storage", 1, db, testhelper.TempDir(t), stateDir, &mockConsumer{}, log.NewPositionTracker()) + require.NoError(t, err) + + // Initialize + bootstrapped, err := rs.initialize(ctx, 3) + require.NoError(t, err) + require.True(t, bootstrapped, "expected bootstrapped installation") + require.NoError(t, rs.localLog.AcknowledgePosition(log.AppliedPosition, 3)) + + // Now the populated committedLSN is 3 + require.Equal(t, storage.LSN(3), rs.committedLSN) + + // First index is 4 (> last index) because all entries are being pruned + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(4), firstIndex) + + // Last index is also 3 + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(3), lastIndex) + + // Notify for the first time. + require.Equal(t, []mockNotification{ + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(3), + }, + }, rs.consumer.(*mockConsumer).GetNotifications()) + + require.NoError(t, rs.close()) + }) + + t.Run("raft storage was bootstrapped, some log entries are left over", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + // Simulate a prior session + db, stateDir := prepopulateStorage(t, ctx, cfg, 5, 3) + + rs, err := NewStorage("test-storage", 1, db, testhelper.TempDir(t), stateDir, &mockConsumer{}, log.NewPositionTracker()) + require.NoError(t, err) + + // Initialize with applied LSN 3 + bootstrapped, err := rs.initialize(ctx, 3) + require.NoError(t, err) + require.True(t, bootstrapped, "expected bootstrapped installation") + require.NoError(t, rs.localLog.AcknowledgePosition(log.AppliedPosition, 3)) + + // First index is 4 + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(4), firstIndex) + + // Last index is 5, equal to the latest appended LSN + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(5), lastIndex) + + // Notify from low-water mark to the committedLSN for the first time. + require.Equal(t, []mockNotification{ + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(3), + }, + }, rs.consumer.(*mockConsumer).GetNotifications()) + + require.NoError(t, rs.close()) + }) +} + +func TestStorage_InitialState(t *testing.T) { + t.Parallel() + + t.Run("empty state returns defaults", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg) + posTracker := log.NewPositionTracker() + + rs, err := NewStorage("test-storage", 1, db, stagingDir, stateDir, nil, posTracker) + require.NoError(t, err) + + hs, cs, err := rs.InitialState() + require.NoError(t, err) + + // When no hard state was stored, we expect empty defaults. + require.Equal(t, raftpb.HardState{}, hs) + require.Equal(t, raftpb.ConfState{}, cs) + }) + + t.Run("initial state exists", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg) + posTracker := log.NewPositionTracker() + + prepopulateEntries(t, ctx, cfg, stagingDir, stateDir, 10) + + rs, err := NewStorage("test-storage", 1, db, stagingDir, stateDir, nil, posTracker) + require.NoError(t, err) + + _, err = rs.initialize(ctx, 0) + require.NoError(t, err) + + // Pre-populate the storage using abstractions + require.NoError(t, rs.saveHardState(raftpb.HardState{ + Term: 4, + Vote: 2, + Commit: 10, + })) + require.NoError(t, rs.saveConfState(raftpb.ConfState{ + Voters: []uint64{1, 2, 3}, + Learners: []uint64{4}, + })) + + hsOut, csOut, err := rs.InitialState() + require.NoError(t, err) + + // Compare the stored hard state and conf state + require.Equal(t, raftpb.HardState{ + Term: 4, + Vote: 2, + Commit: 10, + }, hsOut) + require.Equal(t, raftpb.ConfState{ + Voters: []uint64{1, 2, 3}, + Learners: []uint64{4}, + }, csOut) + }) +} + +func TestStorage_Entries(t *testing.T) { + setupEntries := func(t *testing.T, ctx context.Context, rs *Storage) { + entries := []raftpb.Entry{ + {Term: 1, Index: 1, Type: raftpb.EntryNormal, Data: []byte("entry 1 - pruned")}, + {Term: 1, Index: 2, Type: raftpb.EntryNormal, Data: []byte("entry 2 - pruned")}, + {Term: 2, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3")}, + {Term: 2, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4 - overwritten")}, + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4")}, + {Term: 4, Index: 5, Type: raftpb.EntryNormal, Data: []byte("entry 5")}, + {Term: 4, Index: 6, Type: raftpb.EntryNormal, Data: []byte("entry 6")}, + } + + for _, entry := range entries { + logEntryPath := testhelper.TempDir(t) + + w := wal.NewEntry(logEntryPath) + w.SetKey( + []byte(fmt.Sprintf("key-%d-%d", entry.Term, entry.Index)), + []byte(fmt.Sprintf("value-%d-%d", entry.Term, entry.Index)), + ) + + require.NoError(t, wal.WriteManifest(ctx, w.Directory(), &gitalypb.LogEntry{ + Operations: w.Operations(), + })) + require.NoError(t, rs.insertLogEntry(entry, logEntryPath)) + } + // Set committedLSN and appliedLSN to 2. Log entry 1 and 2 are pruned. + require.NoError(t, rs.saveHardState(raftpb.HardState{Term: 1, Vote: 1, Commit: 2})) + require.NoError(t, rs.localLog.AcknowledgePosition(log.AppliedPosition, 2)) + } + + t.Run("query all entries from empty WAL", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + + fetchedEntries, err := rs.Entries(firstIndex, lastIndex+1, 0) + require.ErrorIs(t, err, raft.ErrUnavailable) + require.Empty(t, fetchedEntries) + }) + + t.Run("query all entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + + fetchedEntries, err := rs.Entries(firstIndex, lastIndex+1, 0) + require.NoError(t, err) + + assertEntries(t, rs, []raftpb.Entry{ + {Term: 2, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3")}, + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4")}, + {Term: 4, Index: 5, Type: raftpb.EntryNormal, Data: []byte("entry 5")}, + {Term: 4, Index: 6, Type: raftpb.EntryNormal, Data: []byte("entry 6")}, + }, fetchedEntries) + }) + + t.Run("query all entries with with a limit < available entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + + fetchedEntries, err := rs.Entries(firstIndex, lastIndex+1, 2) + require.NoError(t, err) + + assertEntries(t, rs, []raftpb.Entry{ + {Term: 2, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3")}, + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4")}, + }, fetchedEntries) + }) + + t.Run("query all entries with with a limit == available entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + + fetchedEntries, err := rs.Entries(firstIndex, lastIndex+1, 4) + require.NoError(t, err) + + assertEntries(t, rs, []raftpb.Entry{ + {Term: 2, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3")}, + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4")}, + {Term: 4, Index: 5, Type: raftpb.EntryNormal, Data: []byte("entry 5")}, + {Term: 4, Index: 6, Type: raftpb.EntryNormal, Data: []byte("entry 6")}, + }, fetchedEntries) + }) + + t.Run("query all entries with with a limit > available entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + + fetchedEntries, err := rs.Entries(firstIndex, lastIndex+1, 99) + require.NoError(t, err) + + assertEntries(t, rs, []raftpb.Entry{ + {Term: 2, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3")}, + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4")}, + {Term: 4, Index: 5, Type: raftpb.EntryNormal, Data: []byte("entry 5")}, + {Term: 4, Index: 6, Type: raftpb.EntryNormal, Data: []byte("entry 6")}, + }, fetchedEntries) + }) + + t.Run("query a subset of entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + fetchedEntries, err := rs.Entries(4, 6, 0) + require.NoError(t, err) + + assertEntries(t, rs, []raftpb.Entry{ + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4")}, + {Term: 4, Index: 5, Type: raftpb.EntryNormal, Data: []byte("entry 5")}, + }, fetchedEntries) + }) + + t.Run("query a subset of entries + limit", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + fetchedEntries, err := rs.Entries(4, 6, 1) + require.NoError(t, err) + + assertEntries(t, rs, []raftpb.Entry{ + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4")}, + }, fetchedEntries) + }) + + t.Run("query compacted entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + fetchedEntries, err := rs.Entries(1, 6, 0) + require.ErrorIs(t, err, raft.ErrCompacted) + require.Empty(t, fetchedEntries) + }) + + t.Run("query unavailable entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + // No entries available + + fetchedEntries, err := rs.Entries(3, 6, 0) + require.ErrorIs(t, err, raft.ErrUnavailable) + require.Empty(t, fetchedEntries) + }) + + t.Run("query out-of-range entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + fetchedEntries, err := rs.Entries(3, 99, 0) + require.ErrorContains(t, err, "reading out-of-bound entries") + require.Empty(t, fetchedEntries) + }) +} + +func TestStorage_Term(t *testing.T) { + t.Parallel() + + insertEntry := func(t *testing.T, ctx context.Context, rs *Storage, entry raftpb.Entry) { + logEntryPath := testhelper.TempDir(t) + w := wal.NewEntry(logEntryPath) + require.NoError(t, wal.WriteManifest(ctx, w.Directory(), &gitalypb.LogEntry{ + Operations: w.Operations(), + })) + require.NoError(t, rs.insertLogEntry(entry, logEntryPath)) + } + + setupEntries := func(t *testing.T, ctx context.Context, rs *Storage) { + entries := []raftpb.Entry{ + {Term: 1, Index: 1, Type: raftpb.EntryNormal, Data: []byte("entry 1 - pruned")}, + {Term: 1, Index: 2, Type: raftpb.EntryNormal, Data: []byte("entry 2")}, + {Term: 2, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3")}, + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4")}, + } + + for _, entry := range entries { + insertEntry(t, ctx, rs, entry) + } + // Set committedLSN and appliedLSN to 1. Log entry 1 is pruned. + require.NoError(t, rs.saveHardState(raftpb.HardState{Term: 1, Vote: 1, Commit: 1})) + require.NoError(t, rs.localLog.AcknowledgePosition(log.AppliedPosition, 1)) + } + + t.Run("query term of the last entry of an empty WAL", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + + term, err := rs.Term(lastIndex) + require.NoError(t, err) + require.Equal(t, uint64(0), term) + }) + + t.Run("query term of normal entries", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + term, err := rs.Term(2) + require.NoError(t, err) + require.Equal(t, uint64(1), term) + + term, err = rs.Term(3) + require.NoError(t, err) + require.Equal(t, uint64(2), term) + + term, err = rs.Term(4) + require.NoError(t, err) + require.Equal(t, uint64(3), term) + }) + + t.Run("query term of a pruned entry", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + _, err := rs.Term(1) + require.ErrorIs(t, err, raft.ErrCompacted) + }) + + t.Run("query term of an entry beyond the last entry", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + rs := setupStorage(t, ctx, cfg) + setupEntries(t, ctx, rs) + + _, err := rs.Term(5) + require.ErrorIs(t, err, raft.ErrUnavailable) + }) + + t.Run("query term of pruned entries after a restart", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg) + + rs, err := NewStorage("test-storage", 1, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker()) + require.NoError(t, err) + + _, err = rs.initialize(ctx, 0) + require.NoError(t, err) + setupEntries(t, ctx, rs) + + // Commit and apply all entries + require.NoError(t, rs.localLog.AcknowledgePosition(log.AppliedPosition, 4)) + require.NoError(t, rs.saveHardState(raftpb.HardState{Term: 4, Vote: 1, Commit: 4})) + require.NoError(t, rs.close()) + + // Now restart the storage + rs, err = NewStorage("test-storage", 1, db, stagingDir, stateDir, &mockConsumer{}, log.NewPositionTracker()) + require.NoError(t, err) + + _, err = rs.initialize(ctx, 4) + require.NoError(t, err) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + + // Log entry 4 is pruned. Its term is implied from the last hard state. + term, err := rs.Term(lastIndex) + require.NoError(t, err) + require.Equal(t, uint64(4), term) + + // Insert another log entry and make it pruned + insertEntry(t, ctx, rs, raftpb.Entry{ + Term: 99, Index: 5, Type: raftpb.EntryNormal, Data: []byte("entry 5"), + }) + + require.NoError(t, rs.saveHardState(raftpb.HardState{Term: 1, Vote: 1, Commit: 5})) + require.NoError(t, rs.localLog.AcknowledgePosition(log.AppliedPosition, 5)) + + // First Index > Last Index now. Log entry 5 is pruned. + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(6), firstIndex) + + lastIndex, err = rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(5), lastIndex) + + // The term is queryable + term, err = rs.Term(lastIndex) + require.NoError(t, err) + require.Equal(t, uint64(99), term) + }) +} + +func TestStorage_insertLogEntry(t *testing.T) { + t.Parallel() + + testAppendLogEntry(t, func(t *testing.T, ctx context.Context, rs *Storage, entry raftpb.Entry) error { + logEntryPath := testhelper.TempDir(t) + + w := wal.NewEntry(logEntryPath) + w.SetKey( + []byte(fmt.Sprintf("key-%d-%d", entry.Term, entry.Index)), + []byte(fmt.Sprintf("value-%d-%d", entry.Term, entry.Index)), + ) + + require.NoError(t, wal.WriteManifest(ctx, w.Directory(), &gitalypb.LogEntry{ + Operations: w.Operations(), + })) + + return rs.insertLogEntry(entry, logEntryPath) + }) +} + +func TestStorage_draftLogEntry(t *testing.T) { + t.Parallel() + + testAppendLogEntry(t, func(t *testing.T, ctx context.Context, rs *Storage, entry raftpb.Entry) error { + return rs.draftLogEntry(entry, func(w *wal.Entry) error { + w.SetKey( + []byte(fmt.Sprintf("key-%d-%d", entry.Term, entry.Index)), + []byte(fmt.Sprintf("value-%d-%d", entry.Term, entry.Index)), + ) + return nil + }) + }) +} + +func assertEntries( + t *testing.T, + rs *Storage, + expectedEntries []raftpb.Entry, + actualEntries []raftpb.Entry, +) { + t.Helper() + + require.Equal(t, len(expectedEntries), len(actualEntries)) + for i, expectedEntry := range expectedEntries { + require.Equal(t, expectedEntry, actualEntries[i]) + + term, err := rs.Term(expectedEntry.Index) + require.NoError(t, err) + require.Equal(t, expectedEntry.Term, term) + + logEntry, err := rs.readLogEntry(storage.LSN(expectedEntry.Index)) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.LogEntry{ + Operations: []*gitalypb.LogEntry_Operation{ + { + Operation: &gitalypb.LogEntry_Operation_SetKey_{ + SetKey: &gitalypb.LogEntry_Operation_SetKey{ + Key: []byte(fmt.Sprintf("key-%d-%d", expectedEntry.Term, expectedEntry.Index)), + Value: []byte(fmt.Sprintf("value-%d-%d", expectedEntry.Term, expectedEntry.Index)), + }, + }, + }, + }, + }, logEntry) + } +} + +func testAppendLogEntry(t *testing.T, appendFunc func(*testing.T, context.Context, *Storage, raftpb.Entry) error) { + t.Run("insert a log entry", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + rs := setupStorage(t, ctx, cfg) + + raftEntry := raftpb.Entry{ + Term: 99, + Index: 1, + Type: raftpb.EntryNormal, + Data: []byte("content 1"), + } + + require.NoError(t, appendFunc(t, ctx, rs, raftEntry)) + + entries, err := rs.Entries(1, 2, 0) + require.NoError(t, err) + + assertEntries(t, rs, []raftpb.Entry{raftEntry}, entries) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), firstIndex) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), lastIndex) + }) + + t.Run("insert multiple log entries in sequence", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + rs := setupStorage(t, ctx, cfg) + + entries := []raftpb.Entry{ + {Term: 1, Index: 1, Type: raftpb.EntryNormal, Data: []byte("entry 1")}, + {Term: 1, Index: 2, Type: raftpb.EntryNormal, Data: []byte("entry 2")}, + {Term: 1, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3")}, + } + + for _, entry := range entries { + require.NoError(t, appendFunc(t, ctx, rs, entry)) + } + + fetchedEntries, err := rs.Entries(1, 4, 0) + require.NoError(t, err) + + assertEntries(t, rs, entries, fetchedEntries) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), firstIndex) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(3), lastIndex) + }) + + t.Run("insert overlapping log entry", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + rs := setupStorage(t, ctx, cfg) + + originalEntry := raftpb.Entry{Term: 1, Index: 1, Type: raftpb.EntryNormal, Data: []byte("original")} + newEntry := raftpb.Entry{Term: 2, Index: 1, Type: raftpb.EntryNormal, Data: []byte("replacement")} + + require.NoError(t, appendFunc(t, ctx, rs, originalEntry)) + require.NoError(t, appendFunc(t, ctx, rs, newEntry)) + + fetchedEntries, err := rs.Entries(1, 2, 0) + require.NoError(t, err) + + assertEntries(t, rs, []raftpb.Entry{newEntry}, fetchedEntries) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), firstIndex) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), lastIndex) + }) + + t.Run("insert multiple overlapping entries with full range overlap", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + rs := setupStorage(t, ctx, cfg) + + entriesBatches := []raftpb.Entry{ + {Term: 1, Index: 1, Type: raftpb.EntryNormal, Data: []byte("entry 1")}, + {Term: 1, Index: 2, Type: raftpb.EntryNormal, Data: []byte("entry 2")}, + {Term: 1, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3")}, + {Term: 2, Index: 2, Type: raftpb.EntryNormal, Data: []byte("entry 2 - replacement")}, + {Term: 2, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3 - replacement")}, + {Term: 2, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4")}, + {Term: 3, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3 - second replacement")}, + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4 - replacement")}, + {Term: 3, Index: 5, Type: raftpb.EntryNormal, Data: []byte("entry 5")}, + } + + for _, entry := range entriesBatches { + require.NoError(t, appendFunc(t, ctx, rs, entry)) + } + + // Final expected entries after resolving overlaps + expectedEntries := []raftpb.Entry{ + {Term: 1, Index: 1, Type: raftpb.EntryNormal, Data: []byte("entry 1")}, + {Term: 2, Index: 2, Type: raftpb.EntryNormal, Data: []byte("entry 2 - replacement")}, + {Term: 3, Index: 3, Type: raftpb.EntryNormal, Data: []byte("entry 3 - second replacement")}, + {Term: 3, Index: 4, Type: raftpb.EntryNormal, Data: []byte("entry 4 - replacement")}, + {Term: 3, Index: 5, Type: raftpb.EntryNormal, Data: []byte("entry 5")}, + } + + // Validate that only the correct entries remain after overlaps + fetchedEntries, err := rs.Entries(1, 6, 0) + require.NoError(t, err) + assertEntries(t, rs, expectedEntries, fetchedEntries) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), firstIndex) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(5), lastIndex) + }) + + t.Run("insert large log entry", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + rs := setupStorage(t, ctx, cfg) + + largeData := make([]byte, 10*1024*1024) // 10MB payload + raftEntry := raftpb.Entry{Term: 1, Index: 1, Type: raftpb.EntryNormal, Data: largeData} + + require.NoError(t, appendFunc(t, ctx, rs, raftEntry)) + + entries, err := rs.Entries(1, 2, 0) + require.NoError(t, err) + + assertEntries(t, rs, []raftpb.Entry{raftEntry}, entries) + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), firstIndex) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), lastIndex) + }) + + t.Run("insert log entry beyond current LSN", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + rs := setupStorage(t, ctx, cfg) + + raftEntry := raftpb.Entry{Term: 1, Index: 5, Type: raftpb.EntryNormal, Data: []byte("entry out of range")} + err := appendFunc(t, ctx, rs, raftEntry) + + // Expecting an error as the LSN is beyond the current range + require.Error(t, err, "expected error when inserting entry beyond current LSN") + + firstIndex, err := rs.FirstIndex() + require.NoError(t, err) + require.Equal(t, uint64(1), firstIndex) + + lastIndex, err := rs.LastIndex() + require.NoError(t, err) + require.Equal(t, uint64(0), lastIndex) + }) + + t.Run("insert log entry below committed LSN", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + stagingDir := testhelper.TempDir(t) + stateDir := testhelper.TempDir(t) + db := getTestDBManager(t, ctx, cfg) + posTracker := log.NewPositionTracker() + + prepopulateEntries(t, ctx, cfg, stagingDir, stateDir, 10) + + rs, err := NewStorage("test-storage", 1, db, stagingDir, stateDir, &mockConsumer{}, posTracker) + require.NoError(t, err) + _, err = rs.initialize(ctx, 0) + require.NoError(t, err) + + require.NoError(t, rs.saveHardState(raftpb.HardState{ + Term: 1, + Vote: 1, + Commit: 3, + })) + + raftEntry := raftpb.Entry{Term: 1, Index: 2, Type: raftpb.EntryNormal, Data: []byte("entry below committed LSN")} + + // Expecting an error as the entry's index is below the committed LSN + require.Error(t, appendFunc(t, ctx, rs, raftEntry), "expected error when inserting entry below committed LSN") + }) +} + +func TestStorage_SaveHardState(t *testing.T) { + t.Parallel() + + insertEntry := func(t *testing.T, ctx context.Context, rs *Storage, entry raftpb.Entry) error { + logEntryPath := testhelper.TempDir(t) + + w := wal.NewEntry(logEntryPath) + require.NoError(t, wal.WriteManifest(ctx, w.Directory(), &gitalypb.LogEntry{ + Operations: w.Operations(), + })) + + return rs.insertLogEntry(entry, logEntryPath) + } + + t.Run("advance committed LSN successfully", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + rs := setupStorage(t, ctx, cfg) + + // Pre-populate the log with entries + entries := []raftpb.Entry{ + {Index: 1, Term: 1}, + {Index: 2, Term: 1}, + {Index: 3, Term: 1}, + } + for _, entry := range entries { + require.NoError(t, insertEntry(t, ctx, rs, entry)) + } + + // Has not received any notification, yet. Highest appendedLSN is 3. + require.Empty(t, rs.consumer.(*mockConsumer).GetNotifications()) + + // Committed set to 1 + require.NoError(t, rs.saveHardState(raftpb.HardState{Commit: 1, Vote: 1, Term: 1})) + require.Equal(t, storage.LSN(1), rs.committedLSN) + + // Receive notification from low water mark -> 1 + require.Equal(t, []mockNotification{ + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(1), + }, + }, rs.consumer.(*mockConsumer).GetNotifications()) + + // Committed set to 2 + require.NoError(t, rs.saveHardState(raftpb.HardState{Commit: 2, Vote: 1, Term: 1})) + require.Equal(t, storage.LSN(2), rs.committedLSN) + + // Receive notification from low water mark -> 2 + require.Equal(t, []mockNotification{ + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(1), + }, + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(2), + }, + }, rs.consumer.(*mockConsumer).GetNotifications()) + + // Committed set to 3 + require.NoError(t, rs.saveHardState(raftpb.HardState{Commit: 3, Vote: 1, Term: 1})) + require.Equal(t, storage.LSN(3), rs.committedLSN) + + // Receive notification from low water mark -> 3 + require.Equal(t, []mockNotification{ + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(1), + }, + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(2), + }, + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(3), + }, + }, rs.consumer.(*mockConsumer).GetNotifications()) + }) + + t.Run("notify consumer since the low water mark", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + rs := setupStorage(t, ctx, cfg) + + // Pre-populate the log with entries + entries := []raftpb.Entry{ + {Index: 1, Term: 1}, + {Index: 2, Term: 1}, + {Index: 3, Term: 1}, + } + for _, entry := range entries { + require.NoError(t, insertEntry(t, ctx, rs, entry)) + } + + // Has not received any notification, yet. Highest appendedLSN is 3. + require.Empty(t, rs.consumer.(*mockConsumer).GetNotifications()) + + // Committed set to 1 + require.NoError(t, rs.saveHardState(raftpb.HardState{Commit: 1, Vote: 1, Term: 1})) + require.Equal(t, storage.LSN(1), rs.committedLSN) + + // Receive notification from 1 -> 1 + require.Equal(t, []mockNotification{ + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(1), + }, + }, rs.consumer.(*mockConsumer).GetNotifications()) + + // Simulate applying up to log entry 1 + require.NoError(t, rs.localLog.AcknowledgePosition(log.AppliedPosition, storage.LSN(1))) + require.Equal(t, storage.LSN(2), rs.localLog.LowWaterMark()) + + // Committed set to 2 + require.NoError(t, rs.saveHardState(raftpb.HardState{Commit: 2, Vote: 1, Term: 1})) + require.Equal(t, storage.LSN(2), rs.committedLSN) + + // Receive notification from 2 -> 2 + require.Equal(t, []mockNotification{ + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(1), + }, + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(2), + highWaterMark: storage.LSN(2), + }, + }, rs.consumer.(*mockConsumer).GetNotifications()) + + // Committed set to 3, but don't update low water mark + require.NoError(t, rs.saveHardState(raftpb.HardState{Commit: 3, Vote: 1, Term: 1})) + require.Equal(t, storage.LSN(3), rs.committedLSN) + + // Receive notification from 2 -> 3 + require.Equal(t, []mockNotification{ + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(1), + highWaterMark: storage.LSN(1), + }, + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(2), + highWaterMark: storage.LSN(2), + }, + { + storageName: rs.storageName, + partitionID: rs.partitionID, + lowWaterMark: storage.LSN(2), + highWaterMark: storage.LSN(3), + }, + }, rs.consumer.(*mockConsumer).GetNotifications()) + + // Simulate applying up to log entry 3 + require.NoError(t, rs.localLog.AcknowledgePosition(log.AppliedPosition, storage.LSN(3))) + require.Equal(t, storage.LSN(4), rs.localLog.LowWaterMark()) + + // No new notifications are sent. + require.Equal(t, 3, len(rs.consumer.(*mockConsumer).GetNotifications())) + }) + + t.Run("reject LSN beyond appendedLSN", func(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + rs := setupStorage(t, ctx, cfg) + + entries := []raftpb.Entry{ + {Index: 1, Term: 1}, + {Index: 2, Term: 1}, + } + for _, entry := range entries { + require.NoError(t, insertEntry(t, ctx, rs, entry)) + } + + err := rs.saveHardState(raftpb.HardState{ + Term: 1, + Vote: 1, + Commit: 3, + }) + require.ErrorContains(t, err, "next committed LSN exceeds appended LSN 3 > 2") + }) +} -- GitLab