diff --git a/internal/gitaly/service/ref/util.go b/internal/gitaly/service/ref/util.go index c7201b87e88bcf18f8655dea8ab0008c3575b153..a4bbb857a701875e2fd693fa0e577a2d77b7de4f 100644 --- a/internal/gitaly/service/ref/util.go +++ b/internal/gitaly/service/ref/util.go @@ -203,7 +203,8 @@ type findRefsOpts struct { cmdArgs []gitcmd.Option delim byte lines.SenderOpts - sortBy string + sortBy string + PageToken string } // Iterator is an iterator that iterates over a set of references @@ -215,18 +216,18 @@ type Iterator interface { } type commitIterator struct { - reader *bufio.Reader - err error - currentBranch *gitalypb.Branch - stderr bytes.Buffer - numLines int - foundPageToken bool - opts *findRefsOpts - done bool - cmd *command.Command - lineDelimiter []byte - accumulated []byte - buffer []byte + reader *bufio.Reader + err error + currentBranch *gitalypb.Branch + stderr, getRefsStderr bytes.Buffer + numLines int + foundPageToken bool + opts *findRefsOpts + done bool + cmd, getRefsCmd *command.Command + lineDelimiter []byte + accumulated []byte + buffer []byte } var fullCommitFields = []string{ @@ -265,27 +266,83 @@ func NewBranchIterator( buffer: make([]byte, 4096), } - options := []gitcmd.Option{ - // %00 inserts the null character into the output (see for-each-ref docs) - gitcmd.Flag{Name: "--format=" + strings.Join(fullCommitFields, "%00") + "%00"}, + var options []gitcmd.Option + + if opts.PageToken == "" && opts.Limit > 0 { + options = append(options, gitcmd.Flag{Name: fmt.Sprintf("--count=%d", opts.Limit)}) } if opts.sortBy != "" { options = append(options, gitcmd.Flag{Name: "--sort=" + opts.sortBy}) - } - cmd, err := repo.Exec(ctx, gitcmd.Command{ - Name: "for-each-ref", - Flags: options, - Args: patterns, - }, gitcmd.WithSetupStdout(), gitcmd.WithStderr(&c.stderr)) - if err != nil { - return nil, fmt.Errorf("spawning for-each-ref: %w", err) - } + getRefsCmd, err := repo.Exec(ctx, gitcmd.Command{ + Name: "for-each-ref", + Flags: append(options, gitcmd.Flag{Name: "--format=%(refname)"}), + Args: patterns, + }, gitcmd.WithSetupStdout(), gitcmd.WithStderr(&c.getRefsStderr)) + if err != nil { + return nil, fmt.Errorf("spawning for-each-ref: %w", err) + } + + stdin := io.MultiReader( + // inject an invalid pattern that matches nothing in the case + // that getRefsCmd returns no results. + // git-for-each-ref --stdin will treat an empty stdin as if no + // patterns were provided. + strings.NewReader("refs/heads/invalid:ref\n"), + getRefsCmd, + ) + + options = []gitcmd.Option{ + gitcmd.Flag{Name: "--stdin"}, + gitcmd.Flag{Name: "--format=" + strings.Join(fullCommitFields, "%00") + "%00"}, + } + + // since git-for-each-ref --stdin does not preserve input + // ordering and sorts by refname by default, we need to add the + // sorting again. + if opts.sortBy != "" { + options = append(options, gitcmd.Flag{Name: "--sort=" + opts.sortBy}) + } + cmd, err := repo.Exec(ctx, gitcmd.Command{ + Name: "for-each-ref", + Flags: options, + }, gitcmd.WithSetupStdout(), gitcmd.WithStdin(stdin), gitcmd.WithStderr(&c.stderr)) + if err != nil { + return nil, fmt.Errorf("spawning for-each-ref: %w", err) + } + + c.getRefsCmd = getRefsCmd + c.cmd = cmd + } else { + if opts.PageToken != "" { + gitVersion, err := repo.GitVersion(ctx) + if err != nil { + return nil, fmt.Errorf("getting git version: %w", err) + } - c.cmd = cmd + if gitVersion.GreaterOrEqual(git.NewVersion(2, 51, 0, 0)) { + options = append(options, gitcmd.Flag{Name: "--start-after=" + opts.PageToken}) + if opts.Limit > 0 { + options = append(options, gitcmd.Flag{Name: fmt.Sprintf("--count=%d", opts.Limit)}) + } + } + } - reader := bufio.NewReader(cmd) + cmd, err := repo.Exec(ctx, gitcmd.Command{ + Name: "for-each-ref", + Flags: append(options, + gitcmd.Flag{Name: "--format=" + strings.Join(fullCommitFields, "%00") + "%00"}, + ), + }, gitcmd.WithSetupStdout(), gitcmd.WithStderr(&c.stderr)) + if err != nil { + return nil, fmt.Errorf("spawning for-each-ref: %w", err) + } + + c.cmd = cmd + } + + reader := bufio.NewReader(c.cmd) c.reader = reader @@ -363,16 +420,32 @@ func (c *commitIterator) Close() error { if c.opts.PageTokenError && !c.foundPageToken { return fmt.Errorf("sending lines: %w", lines.ErrInvalidPageToken) } + // When we have a limit set up and have sent all references upstream then the call to `Wait()` may + // indeed cause us to tear down the still-running git-for-each-ref(1) process. Because we close stdout + // before sending a signal the end result may be that the process will die with EPIPE because it failed + // to write to stdout. + // + // This is an expected error though, and thus we ignore it here. + + if c.getRefsCmd != nil { + if err := c.getRefsCmd.Wait(); err != nil { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + status, ok := exitErr.ProcessState.Sys().(syscall.WaitStatus) + if ok && status.Signaled() && status.Signal() == syscall.SIGPIPE { + return nil + } + + return structerr.New("listing failed with exit code %d", status.ExitStatus()). + WithMetadata("stderr", c.getRefsStderr.String()) + } + return fmt.Errorf("waiting for for-each-ref: %w", err) + } + } if err := c.cmd.Wait(); err != nil { var exitErr *exec.ExitError if errors.As(err, &exitErr) { - // When we have a limit set up and have sent all references upstream then the call to `Wait()` may - // indeed cause us to tear down the still-running git-for-each-ref(1) process. Because we close stdout - // before sending a signal the end result may be that the process will die with EPIPE because it failed - // to write to stdout. - // - // This is an expected error though, and thus we ignore it here. status, ok := exitErr.ProcessState.Sys().(syscall.WaitStatus) if ok && status.Signaled() && status.Signal() == syscall.SIGPIPE { return nil @@ -401,7 +474,10 @@ func (s *server) findRefsWithIterator( } defer func() { - err = errors.Join(err, iterator.Close()) + closeErr := iterator.Close() + if closeErr != nil && !strings.Contains(closeErr.Error(), "file already closed") { + err = errors.Join(err, closeErr) + } }() for iterator.Next() { @@ -485,12 +561,15 @@ type paginationOpts struct { // When PageTokenError is true then the response will return an error when // PageToken is not found. PageTokenError bool + + PageToken string } func buildPaginationOpts(ctx context.Context, p *gitalypb.PaginationParameter) *paginationOpts { opts := &paginationOpts{} opts.IsPageToken = func(_ []byte) bool { return true } opts.Limit = math.MaxInt32 + opts.PageToken = p.GetPageToken() if p == nil { return opts @@ -522,6 +601,7 @@ func buildFindRefsOpts(ctx context.Context, p *gitalypb.PaginationParameter) *fi refsOpts.Limit = opts.Limit refsOpts.IsPageToken = opts.IsPageToken refsOpts.PageTokenError = opts.PageTokenError + refsOpts.PageToken = opts.PageToken return refsOpts }