diff --git a/internal/backup/log_entry.go b/internal/backup/log_entry.go index d347a644888a8dbdf3e81d8b399a697a22b5625a..c6f85fb6e76b76f992c849c4cb2b77386ae68882 100644 --- a/internal/backup/log_entry.go +++ b/internal/backup/log_entry.go @@ -538,6 +538,15 @@ func (s LogEntryStore) Exists(ctx context.Context, info PartitionInfo, lsn stora return exists, nil } +// GetReader returns a reader in order to read a log entry from the store. +func (s *LogEntryStore) GetReader(ctx context.Context, info PartitionInfo, lsn storage.LSN) (io.ReadCloser, error) { + r, err := s.sink.GetReader(ctx, archivePath(info, lsn)) + if err != nil { + return nil, fmt.Errorf("get reader: %w", err) + } + return r, nil +} + // GetWriter returns a writer in order to write a new log entry into the store. func (s LogEntryStore) GetWriter(ctx context.Context, info PartitionInfo, lsn storage.LSN) (io.WriteCloser, error) { w, err := s.sink.GetWriter(ctx, archivePath(info, lsn)) diff --git a/internal/cli/gitaly/subcmd_recovery.go b/internal/cli/gitaly/subcmd_recovery.go index 9f0a99ff10c72fcf5295f4483634aeb0df4ae6b7..58c579412c0502e2364e6e5c9a5d8a8d1ae99ffd 100644 --- a/internal/cli/gitaly/subcmd_recovery.go +++ b/internal/cli/gitaly/subcmd_recovery.go @@ -1,9 +1,12 @@ package gitaly import ( + "archive/tar" "errors" "fmt" + "io" "os" + "path/filepath" "strconv" "strings" "time" @@ -19,6 +22,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" @@ -31,6 +35,17 @@ const ( flagPartition = "partition" ) +type recoveryContext struct { + appliedLSN storage.LSN + relativePaths []string + partition storage.Partition + partitionID storage.PartitionID + storageName string + logWriter storage.LogWriter + logEntryStore backup.LogEntryStore + cleanupFuncs []func() error +} + func newRecoveryCommand() *cli.Command { return &cli.Command{ Name: "recovery", @@ -58,35 +73,240 @@ Example: gitaly recovery --config gitaly.config.toml status --storage default -- }, }, }, + { + Name: "replay", + Usage: "apply all available contiguous archived log entries for a partition, gitaly must be stopped before running this command", + UsageText: `gitaly recovery --config replay [command options] + + Example: gitaly recovery --config gitaly.config.toml replay --storage default --partition 2`, + Action: recoveryReplayAction, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: flagStorage, + Usage: "storage containing the partition", + }, + &cli.StringFlag{ + Name: flagPartition, + Usage: "partition ID", + }, + }, + }, }, } } func recoveryStatusAction(ctx *cli.Context) (returnErr error) { - logger := log.ConfigureCommand() + recoveryContext, err := setupRecoveryContext(ctx) + if err != nil { + return fmt.Errorf("setup recovery context: %w", err) + } + defer func() { + returnErr = errors.Join(returnErr, recoveryContext.Cleanup()) + }() - cfg, err := loadConfig(ctx.String(flagConfig)) + fmt.Fprintf(ctx.App.Writer, "Partition ID: %s\n", recoveryContext.partitionID.String()) + fmt.Fprintf(ctx.App.Writer, "Applied LSN: %s\n", recoveryContext.appliedLSN.String()) + + if len(recoveryContext.relativePaths) > 0 { + fmt.Fprintf(ctx.App.Writer, "Relative paths:\n") + for _, relativePath := range recoveryContext.relativePaths { + fmt.Fprintf(ctx.App.Writer, " - %s\n", relativePath) + } + } + + entries := recoveryContext.logEntryStore.Query(backup.PartitionInfo{ + PartitionID: recoveryContext.partitionID, + StorageName: recoveryContext.storageName, + }, recoveryContext.appliedLSN+1) + + fmt.Fprintf(ctx.App.Writer, "Available backup entries:\n") + for entries.Next(ctx.Context) { + fmt.Fprintf(ctx.App.Writer, " - %s\n", entries.LSN()) + } + + if err := entries.Err(); err != nil { + return fmt.Errorf("query log entry store: %w", err) + } + + return nil +} + +func recoveryReplayAction(ctx *cli.Context) (returnErr error) { + recoveryContext, err := setupRecoveryContext(ctx) if err != nil { - return fmt.Errorf("load config: %w", err) + return fmt.Errorf("setup recovery context: %w", err) } + defer func() { + returnErr = errors.Join(returnErr, recoveryContext.Cleanup()) + }() - runtimeDir, err := os.MkdirTemp("", "gitaly-recovery-*") + tempDir, err := os.MkdirTemp("", "gitaly-recovery-replay-*") if err != nil { - return fmt.Errorf("creating runtime dir: %w", err) + return fmt.Errorf("create temp dir: %w", err) + } + defer func() { + if err := os.RemoveAll(tempDir); err != nil { + returnErr = errors.Join(returnErr, fmt.Errorf("removing temp dir: %w", err)) + } + }() + + fmt.Fprintf(ctx.App.Writer, "Partition ID: %s\n", recoveryContext.partitionID.String()) + fmt.Fprintf(ctx.App.Writer, "Applied LSN: %s\n", recoveryContext.appliedLSN.String()) + fmt.Fprintf(ctx.App.Writer, "Starting archived log entries import\n") + + partitionInfo := backup.PartitionInfo{ + PartitionID: recoveryContext.partitionID, + StorageName: recoveryContext.storageName, } + nextLSN := recoveryContext.appliedLSN + 1 + finalLSN := recoveryContext.appliedLSN + + iterator := recoveryContext.logEntryStore.Query(backup.PartitionInfo{ + PartitionID: recoveryContext.partitionID, + StorageName: recoveryContext.storageName, + }, nextLSN) + for iterator.Next(ctx.Context) { + if nextLSN != iterator.LSN() { + return fmt.Errorf("there is discontinuity in the WAL entries. Expected: %d, Got: %d", nextLSN, iterator.LSN()) + } + + reader, err := recoveryContext.logEntryStore.GetReader(ctx.Context, partitionInfo, nextLSN) + if err != nil { + return fmt.Errorf("get reader for entry with LSN %s: %w", nextLSN, err) + } + + if err := processLogEntry(reader, tempDir, recoveryContext.logWriter, nextLSN); err != nil { + reader.Close() + return fmt.Errorf("process log entry %s: %w", nextLSN, err) + } + reader.Close() + finalLSN = nextLSN + nextLSN++ + } + + if err := iterator.Err(); err != nil { + return fmt.Errorf("query log entry store: %w", err) + } + + fmt.Fprintf(ctx.App.Writer, "Successfully processed log entries up to LSN %s\n", finalLSN) + + return nil +} + +func processLogEntry(reader io.Reader, tempDir string, logWriter storage.LogWriter, lsn storage.LSN) (returnErr error) { + path := filepath.Join(tempDir, lsn.String()) + + if err := downloadArchive(reader, path); err != nil { + return fmt.Errorf("download archive: %w", err) + } + + if err := extractArchive(path); err != nil { + return fmt.Errorf("extract archive: %w", err) + } + + appendedLSN, err := logWriter.AppendLogEntry(path) + if err != nil { + return fmt.Errorf("append log entry: %w", err) + } + if appendedLSN != lsn { + return fmt.Errorf("appended LSN %s does not match expected LSN %s", appendedLSN, lsn) + } + + return nil +} + +func downloadArchive(reader io.Reader, path string) error { + archivePath := path + ".tar" + file, err := os.Create(archivePath) + if err != nil { + return fmt.Errorf("create archive file: %w", err) + } + defer file.Close() + + _, err = io.Copy(file, reader) + if err != nil { + return fmt.Errorf("copy archive content: %w", err) + } + + return nil +} + +func extractArchive(path string) error { + if err := os.MkdirAll(path, mode.Directory); err != nil { + return fmt.Errorf("create destination directory: %w", err) + } + + archivePath := path + ".tar" + archiveFile, err := os.Open(archivePath) + if err != nil { + return fmt.Errorf("open archive file: %w", err) + } + defer archiveFile.Close() + + tr := tar.NewReader(archiveFile) + for { + header, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("read tar header: %w", err) + } + + target := filepath.Join(path, header.Name) + switch header.Typeflag { + case tar.TypeDir: + if err := os.MkdirAll(target, mode.Directory); err != nil { + return fmt.Errorf("create directory: %w", err) + } + case tar.TypeReg: + f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode)) + if err != nil { + return fmt.Errorf("create file: %w", err) + } + if _, err := io.Copy(f, tr); err != nil { + f.Close() + return fmt.Errorf("write file content: %w", err) + } + f.Close() + default: + return fmt.Errorf("tar header type not supported: %d", header.Typeflag) + } + } + + return nil +} + +func setupRecoveryContext(ctx *cli.Context) (rc recoveryContext, returnErr error) { + recoveryContext := recoveryContext{} defer func() { - if err := os.RemoveAll(runtimeDir); err != nil { - returnErr = errors.Join(returnErr, fmt.Errorf("removing runtime dir: %w", err)) + if returnErr != nil { + returnErr = errors.Join(returnErr, recoveryContext.Cleanup()) } }() + logger := log.ConfigureCommand() + + cfg, err := loadConfig(ctx.String(flagConfig)) + if err != nil { + return recoveryContext, fmt.Errorf("load config: %w", err) + } + + runtimeDir, err := os.MkdirTemp("", "gitaly-recovery-*") + if err != nil { + return recoveryContext, fmt.Errorf("creating runtime dir: %w", err) + } + recoveryContext.cleanupFuncs = append(recoveryContext.cleanupFuncs, func() error { + return os.RemoveAll(runtimeDir) + }) + cfg.RuntimeDir = runtimeDir if err := gitaly.UnpackAuxiliaryBinaries(cfg.RuntimeDir, func(binaryName string) bool { return strings.HasPrefix(binaryName, "gitaly-git") }); err != nil { - return fmt.Errorf("unpack auxiliary binaries: %w", err) + return recoveryContext, fmt.Errorf("unpack auxiliary binaries: %w", err) } dbMgr, err := databasemgr.NewDBManager( @@ -97,25 +317,34 @@ func recoveryStatusAction(ctx *cli.Context) (returnErr error) { logger, ) if err != nil { - return fmt.Errorf("new db manager: %w", err) + return recoveryContext, fmt.Errorf("new db manager: %w", err) } - defer dbMgr.Close() - - locator := config.NewLocator(cfg) + recoveryContext.cleanupFuncs = append(recoveryContext.cleanupFuncs, func() error { + dbMgr.Close() + return nil + }) gitCmdFactory, cleanup, err := gitcmd.NewExecCommandFactory(cfg, logger) if err != nil { - return fmt.Errorf("creating Git command factory: %w", err) + return recoveryContext, fmt.Errorf("creating Git command factory: %w", err) } - defer cleanup() + recoveryContext.cleanupFuncs = append(recoveryContext.cleanupFuncs, func() error { + cleanup() + return nil + }) catfileCache := catfile.NewCache(cfg) - defer catfileCache.Stop() + recoveryContext.cleanupFuncs = append(recoveryContext.cleanupFuncs, func() error { + catfileCache.Stop() + return nil + }) housekeepingMetrics := housekeeping.NewMetrics(cfg.Prometheus) partitionMetrics := partition.NewMetrics(housekeepingMetrics) storageMetrics := storagemgr.NewMetrics(cfg.Prometheus) + locator := config.NewLocator(cfg) + node, err := nodeimpl.NewManager( cfg.Storages, storagemgr.NewFactory( @@ -134,14 +363,17 @@ func recoveryStatusAction(ctx *cli.Context) (returnErr error) { ), ) if err != nil { - return fmt.Errorf("new node: %w", err) + return recoveryContext, fmt.Errorf("new node: %w", err) } - defer node.Close() + recoveryContext.cleanupFuncs = append(recoveryContext.cleanupFuncs, func() error { + node.Close() + return nil + }) storageName := ctx.String(flagStorage) if storageName == "" { if len(cfg.Storages) != 1 { - return fmt.Errorf("multiple storages configured: use --storage to specify the one you want") + return recoveryContext, fmt.Errorf("multiple storages configured: use --storage to specify the one you want") } storageName = cfg.Storages[0].Name @@ -149,72 +381,55 @@ func recoveryStatusAction(ctx *cli.Context) (returnErr error) { nodeStorage, err := node.GetStorage(storageName) if err != nil { - return fmt.Errorf("get storage: %w", err) + return recoveryContext, fmt.Errorf("get storage: %w", err) } var partitionID storage.PartitionID if err := parsePartitionID(&partitionID, ctx.String(flagPartition)); err != nil { - return fmt.Errorf("parse partition ID: %w", err) + return recoveryContext, fmt.Errorf("parse partition ID: %w", err) } if partitionID == 0 { - return fmt.Errorf("invalid partition ID %s", partitionID) + return recoveryContext, fmt.Errorf("invalid partition ID %s", partitionID) } partition, err := nodeStorage.GetPartition(ctx.Context, partitionID) if err != nil { - return fmt.Errorf("get partition: %w", err) + return recoveryContext, fmt.Errorf("get partition: %w", err) } - defer partition.Close() + recoveryContext.cleanupFuncs = append(recoveryContext.cleanupFuncs, func() error { + partition.Close() + return nil + }) txn, err := partition.Begin(ctx.Context, storage.BeginOptions{ RelativePaths: []string{}, }) if err != nil { - return fmt.Errorf("begin: %w", err) + return recoveryContext, fmt.Errorf("begin: %w", err) } - defer func() { - err := txn.Rollback(ctx.Context) - returnErr = errors.Join(returnErr, err) - }() - - appliedLSN := txn.SnapshotLSN() - relativePaths := txn.PartitionRelativePaths() + recoveryContext.cleanupFuncs = append(recoveryContext.cleanupFuncs, func() error { + return txn.Rollback(ctx.Context) + }) - fmt.Fprintf(ctx.App.Writer, "Partition ID: %s\n", partitionID.String()) - fmt.Fprintf(ctx.App.Writer, "Applied LSN: %s\n", appliedLSN.String()) - - if len(relativePaths) > 0 { - fmt.Fprintf(ctx.App.Writer, "Relative paths:\n") - for _, relativePath := range relativePaths { - fmt.Fprintf(ctx.App.Writer, " - %s\n", relativePath) - } - } + recoveryContext.appliedLSN = txn.SnapshotLSN() + recoveryContext.relativePaths = txn.PartitionRelativePaths() if cfg.Backup.WALGoCloudURL == "" { - return fmt.Errorf("write-ahead log backup is not configured") + return recoveryContext, fmt.Errorf("write-ahead log backup is not configured") } - sink, err := backup.ResolveSink(ctx.Context, cfg.Backup.WALGoCloudURL) if err != nil { - return fmt.Errorf("resolve sink: %w", err) + return recoveryContext, fmt.Errorf("resolve sink: %w", err) } - logEntryStore := backup.NewLogEntryStore(sink) - entries := logEntryStore.Query(backup.PartitionInfo{ - PartitionID: partitionID, - StorageName: storageName, - }, appliedLSN+1) - fmt.Fprintf(ctx.App.Writer, "Available backup entries:\n") - for entries.Next(ctx.Context) { - fmt.Fprintf(ctx.App.Writer, " - %s\n", entries.LSN()) - } + recoveryContext.partition = partition + recoveryContext.partitionID = partitionID + recoveryContext.storageName = storageName + recoveryContext.logWriter = partition.GetLogWriter() + recoveryContext.logEntryStore = backup.NewLogEntryStore(sink) - if err := entries.Err(); err != nil { - return fmt.Errorf("query log entry store: %w", err) - } - - return nil + return recoveryContext, nil } func parsePartitionID(id *storage.PartitionID, value string) error { @@ -227,3 +442,11 @@ func parsePartitionID(id *storage.PartitionID, value string) error { return nil } + +func (rc *recoveryContext) Cleanup() error { + var err error + for _, cleanup := range rc.cleanupFuncs { + err = errors.Join(err, cleanup()) + } + return err +} diff --git a/internal/cli/gitaly/subcmd_recovery_test.go b/internal/cli/gitaly/subcmd_recovery_test.go index b0f667cefa1e95f07da5e715c7190849e02f7eeb..daae8daec33de9542ab511f036615c1afb5497b1 100644 --- a/internal/cli/gitaly/subcmd_recovery_test.go +++ b/internal/cli/gitaly/subcmd_recovery_test.go @@ -1,6 +1,8 @@ package gitaly import ( + "archive/tar" + "bytes" "context" "errors" "fmt" @@ -28,6 +30,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/protobuf/proto" ) func TestRecoveryCLI_status(t *testing.T) { @@ -258,3 +261,346 @@ Available backup entries: }) } } + +func TestRecoveryCLI_replay(t *testing.T) { + t.Parallel() + + type setupOptions struct { + cfg config.Cfg + storageMgr node.Storage + locator storage.Locator + gitCmdFactory gitcmd.CommandFactory + catfileCache catfile.Cache + backupRoot string + } + + type setupData struct { + storageName string + partitionID storage.PartitionID + expectedErr error + expectedOutput string + expectedLSN storage.LSN + } + + for _, tc := range []struct { + desc string + setup func(tb testing.TB, ctx context.Context, opts setupOptions) setupData + }{ + { + desc: "unknown storage", + setup: func(tb testing.TB, ctx context.Context, opts setupOptions) setupData { + return setupData{ + storageName: "pineapple", + expectedErr: errors.New("exit status 1"), + expectedOutput: "get storage: storage name not found\n", + expectedLSN: storage.LSN(0), + } + }, + }, + { + desc: "partition 0", + setup: func(tb testing.TB, ctx context.Context, opts setupOptions) setupData { + return setupData{ + storageName: opts.cfg.Storages[0].Name, + partitionID: 0, + expectedErr: errors.New("exit status 1"), + expectedOutput: fmt.Sprintf("invalid partition ID %s\n", storage.PartitionID(0)), + expectedLSN: storage.LSN(0), + } + }, + }, + { + desc: "unknown partition", + setup: func(tb testing.TB, ctx context.Context, opts setupOptions) setupData { + return setupData{ + storageName: opts.cfg.Storages[0].Name, + partitionID: 42, + // TODO: This currently will create arbitrary partitions. + // It should return an error instead. + // https://gitlab.com/gitlab-org/gitaly/-/issues/6478 + expectedOutput: fmt.Sprintf(`Partition ID: %s +Applied LSN: %s +Starting archived log entries import +Successfully processed log entries up to LSN %s +`, + storage.PartitionID(42), + storage.LSN(0), + storage.LSN(0), + ), + expectedLSN: storage.LSN(0), + } + }, + }, + { + desc: "success, no backups", + setup: func(tb testing.TB, ctx context.Context, opts setupOptions) setupData { + logger := testhelper.SharedLogger(t) + repo := &gitalypb.Repository{ + StorageName: opts.cfg.Storages[0].Name, + RelativePath: gittest.NewRepositoryName(t), + } + + txn, err := opts.storageMgr.Begin(ctx, storage.TransactionOptions{ + RelativePath: repo.GetRelativePath(), + AllowPartitionAssignmentWithoutRepository: true, + }) + require.NoError(t, err) + + err = repoutil.Create( + storage.ContextWithTransaction(ctx, txn), + logger, + opts.locator, + opts.gitCmdFactory, + opts.catfileCache, + transaction.NewTrackingManager(), + counter.NewRepositoryCounter(opts.cfg.Storages), + txn.RewriteRepository(repo), + func(repo *gitalypb.Repository) error { + return nil + }, + ) + require.NoError(t, err) + + require.NoError(t, txn.Commit(ctx)) + + return setupData{ + storageName: repo.GetStorageName(), + partitionID: 2, + expectedOutput: fmt.Sprintf(`Partition ID: %s +Applied LSN: %s +Starting archived log entries import +Successfully processed log entries up to LSN %s +`, + storage.PartitionID(2), + storage.LSN(1), + storage.LSN(1), + ), + expectedLSN: storage.LSN(1), + } + }, + }, + { + desc: "success, contiguous backups", + setup: func(tb testing.TB, ctx context.Context, opts setupOptions) setupData { + logger := testhelper.SharedLogger(t) + repo := &gitalypb.Repository{ + StorageName: opts.cfg.Storages[0].Name, + RelativePath: gittest.NewRepositoryName(t), + } + + txn, err := opts.storageMgr.Begin(ctx, storage.TransactionOptions{ + RelativePath: repo.GetRelativePath(), + AllowPartitionAssignmentWithoutRepository: true, + }) + require.NoError(t, err) + + err = repoutil.Create( + storage.ContextWithTransaction(ctx, txn), + logger, + opts.locator, + opts.gitCmdFactory, + opts.catfileCache, + transaction.NewTrackingManager(), + counter.NewRepositoryCounter(opts.cfg.Storages), + txn.RewriteRepository(repo), + func(repo *gitalypb.Repository) error { + return nil + }, + ) + require.NoError(t, err) + + require.NoError(t, txn.Commit(ctx)) + + partitionPath := filepath.Join(repo.GetStorageName(), fmt.Sprintf("%d", storage.PartitionID(2))) + testhelper.WriteFiles(t, opts.backupRoot, map[string]any{ + filepath.Join(partitionPath, storage.LSN(1).String()+".tar"): createValidLogEntryArchive(t, repo.GetRelativePath()), + filepath.Join(partitionPath, storage.LSN(2).String()+".tar"): createValidLogEntryArchive(t, repo.GetRelativePath()), + filepath.Join(partitionPath, storage.LSN(3).String()+".tar"): createValidLogEntryArchive(t, repo.GetRelativePath()), + }) + + return setupData{ + storageName: repo.GetStorageName(), + partitionID: 2, + expectedOutput: fmt.Sprintf(`Partition ID: %s +Applied LSN: %s +Starting archived log entries import +Successfully processed log entries up to LSN %s +`, + storage.PartitionID(2), + storage.LSN(1), + storage.LSN(3), + ), + expectedLSN: storage.LSN(3), + } + }, + }, + { + desc: "non-contiguous backups", + setup: func(tb testing.TB, ctx context.Context, opts setupOptions) setupData { + logger := testhelper.SharedLogger(t) + repo := &gitalypb.Repository{ + StorageName: opts.cfg.Storages[0].Name, + RelativePath: gittest.NewRepositoryName(t), + } + + txn, err := opts.storageMgr.Begin(ctx, storage.TransactionOptions{ + RelativePath: repo.GetRelativePath(), + AllowPartitionAssignmentWithoutRepository: true, + }) + require.NoError(t, err) + + err = repoutil.Create( + storage.ContextWithTransaction(ctx, txn), + logger, + opts.locator, + opts.gitCmdFactory, + opts.catfileCache, + transaction.NewTrackingManager(), + counter.NewRepositoryCounter(opts.cfg.Storages), + txn.RewriteRepository(repo), + func(repo *gitalypb.Repository) error { + return nil + }, + ) + require.NoError(t, err) + + require.NoError(t, txn.Commit(ctx)) + + partitionPath := filepath.Join(repo.GetStorageName(), fmt.Sprintf("%d", storage.PartitionID(2))) + testhelper.WriteFiles(t, opts.backupRoot, map[string]any{ + filepath.Join(partitionPath, storage.LSN(1).String()+".tar"): createValidLogEntryArchive(t, repo.GetRelativePath()), + filepath.Join(partitionPath, storage.LSN(2).String()+".tar"): createValidLogEntryArchive(t, repo.GetRelativePath()), + filepath.Join(partitionPath, storage.LSN(4).String()+".tar"): createValidLogEntryArchive(t, repo.GetRelativePath()), + }) + + return setupData{ + storageName: repo.GetStorageName(), + partitionID: 2, + expectedErr: errors.New("exit status 1"), + expectedOutput: "there is discontinuity in the WAL entries. Expected: 3, Got: 4\n", + expectedLSN: storage.LSN(2), + } + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + backupRoot := t.TempDir() + tCtx := testhelper.Context(t) + cfg := testcfg.Build(t) + cfg.Backup.WALGoCloudURL = backupRoot + configPath := testcfg.WriteTemporaryGitalyConfigFile(t, cfg) + testcfg.BuildGitaly(t, cfg) + + logger := testhelper.SharedLogger(t) + + ctx, cancel := context.WithCancel(tCtx) + defer cancel() + + dbMgr, err := databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) + require.NoError(t, err) + defer dbMgr.Close() + + locator := config.NewLocator(cfg) + gitCmdFactory := gittest.NewCommandFactory(t, cfg) + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + storageMgr, err := storagemgr.NewStorageManager( + logger, + cfg.Storages[0].Name, + cfg.Storages[0].Path, + dbMgr, + partition.NewFactory( + gitCmdFactory, + localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), + partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), + nil, + ), + 1, + storagemgr.NewMetrics(cfg.Prometheus), + ) + require.NoError(t, err) + + data := tc.setup(t, ctx, setupOptions{ + cfg: cfg, + storageMgr: storageMgr, + locator: locator, + gitCmdFactory: gitCmdFactory, + catfileCache: catfileCache, + backupRoot: backupRoot, + }) + + // Stop storage and DB so that we can run the command "offline" + storageMgr.Close() + dbMgr.Close() + + cmd := exec.Command(cfg.BinaryPath("gitaly"), "recovery", "-config", configPath, "replay", "-storage", data.storageName, "-partition", data.partitionID.String()) + + output, err := cmd.CombinedOutput() + testhelper.RequireGrpcError(t, data.expectedErr, err) + + require.Contains(t, string(output), data.expectedOutput) + + // Creating storage manager again as we had to close it previously to run the command in offline mode + dbMgr, err = databasemgr.NewDBManager(ctx, cfg.Storages, keyvalue.NewBadgerStore, helper.NewNullTickerFactory(), logger) + require.NoError(t, err) + defer dbMgr.Close() + + storageMgr, err = storagemgr.NewStorageManager( + logger, + cfg.Storages[0].Name, + cfg.Storages[0].Path, + dbMgr, + partition.NewFactory( + gitCmdFactory, + localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), + partition.NewMetrics(housekeeping.NewMetrics(cfg.Prometheus)), + nil, + ), + 1, + storagemgr.NewMetrics(cfg.Prometheus), + ) + require.NoError(t, err) + defer storageMgr.Close() + + partition, err := storageMgr.GetPartition(ctx, data.partitionID) + require.NoError(t, err) + + tr, err := partition.Begin(ctx, storage.BeginOptions{}) + require.NoError(t, err) + appliedLSN := tr.SnapshotLSN() + require.NoError(t, tr.Rollback(ctx)) + require.Equal(t, data.expectedLSN, appliedLSN) + }) + } +} + +func createValidLogEntryArchive(t *testing.T, repoRelativePath string) []byte { + t.Helper() + + var buf bytes.Buffer + tw := tar.NewWriter(&buf) + + // Create a dummy MANIFEST file + manifest := &gitalypb.LogEntry{ + RelativePath: repoRelativePath, + Operations: []*gitalypb.LogEntry_Operation{}, + } + manifestBytes, err := proto.Marshal(manifest) + require.NoError(t, err) + + err = tw.WriteHeader(&tar.Header{ + Name: "MANIFEST", + Mode: 0o644, + Size: int64(len(manifestBytes)), + }) + require.NoError(t, err) + _, err = tw.Write(manifestBytes) + require.NoError(t, err) + + require.NoError(t, tw.Close()) + + return buf.Bytes() +} diff --git a/internal/gitaly/storage/storage.go b/internal/gitaly/storage/storage.go index 4afbccf00a47476accb7c84fe9360175e40cf2e7..f493bdd6af11ae82f7ec2b5f42e03bf6c99dc445 100644 --- a/internal/gitaly/storage/storage.go +++ b/internal/gitaly/storage/storage.go @@ -180,9 +180,17 @@ type LogReader interface { AcknowledgeAppliedPosition(lsn LSN) } +// LogWriter adds entries to the Write-Ahead Log. +type LogWriter interface { + // AppendLogEntry appends an entry to the WAL. logEntryPath specifies the directory of the log entry. It returns + // the Log Sequence Number (LSN) of the appended log entry. + AppendLogEntry(logEntryPath string) (LSN, error) +} + // LogManager is the interface used to manage the underlying Write-Ahead Log entries. type LogManager interface { LogReader + LogWriter // Initialize sets up the initial state of the LogManager, preparing it to manage log entries. // It ensures the environment is ready, and previous states are resumed correctly. @@ -192,10 +200,6 @@ type LogManager interface { // is blocked until complete. Close() error - // AppendLogEntry appends an entry to the WAL. logEntryPath specifies the directory of the log entry. It returns - // the Log Sequence Number (LSN) of the appended log entry. - AppendLogEntry(logEntryPath string) (LSN, error) - // AppendedLSN returns the LSN of the latest appended log entry. AppendedLSN() LSN @@ -212,6 +216,8 @@ type Partition interface { // GetLogReader provides controlled access to underlying log management system for log consumption purpose. // It allows the consumers to access to on-disk location of a LSN and acknowledge consumed position. GetLogReader() LogReader + // GetLogWriter provides controlled access to underlying log management system for log appending purpose. + GetLogWriter() LogWriter } // TransactionOptions are used to pass transaction options into Begin. diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 05a41952df1b8c7d1d844491586e41b6e8aed4a8..f9d6efabe85113933f2833d45f477ced8e78d8c4 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -874,6 +874,11 @@ func (mgr *TransactionManager) GetLogReader() storage.LogReader { return mgr.logManager } +// GetLogWriter provides controlled access to underlying log management system for log appending purpose. +func (mgr *TransactionManager) GetLogWriter() storage.LogWriter { + return mgr.logManager +} + // TransactionManager is responsible for transaction management of a single repository. Each repository has // a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from // the admissionQueue. Each admitted write is processed in three steps: