diff --git a/internal/git/pktline/pkt_line_test.go b/internal/git/pktline/pkt_line_test.go index 32694a7e0ab4eac1d122d50f873a0585fa2d9ae0..dc262634f0378276a779845dfadf855217510573 100644 --- a/internal/git/pktline/pkt_line_test.go +++ b/internal/git/pktline/pkt_line_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "io" + "io/ioutil" "math" "math/rand" "strings" @@ -334,3 +335,37 @@ func TestEachSidebandPacket(t *testing.T) { }) } } + +type writeCounter struct { + W io.Writer + N int +} + +func (wc *writeCounter) Write(p []byte) (int, error) { + wc.N++ + return wc.W.Write(p) +} + +func TestSidebandWriter_MaxSidebandFrameSize(t *testing.T) { + testCases := []struct { + desc string + size int + writes int + }{ + {desc: "maximum frame size", size: MaxSidebandFrameSize, writes: 1}, + {desc: "just over maximum frame size", size: MaxSidebandFrameSize + 1, writes: 2}, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + w := &writeCounter{W: ioutil.Discard} + sw := NewSidebandWriter(w) + + in := strings.Repeat("x", tc.size) + n, err := io.WriteString(sw.Writer(0), in) + require.NoError(t, err) + require.Equal(t, tc.size, n) + require.Equal(t, tc.writes, w.N) + }) + } +} diff --git a/internal/git/pktline/pktline.go b/internal/git/pktline/pktline.go index 665d6cbde64d5a1d7e4843be2e7b5f2c5ed72e99..c0f02d513288543b3d4d76919cb18606ea630d29 100644 --- a/internal/git/pktline/pktline.go +++ b/internal/git/pktline/pktline.go @@ -13,8 +13,10 @@ import ( ) const ( - maxPktSize = 65520 // https://gitlab.com/gitlab-org/git/-/blob/v2.30.0/pkt-line.h#L216 - pktDelim = "0001" + // MaxSidebandFrameSize is the maximum number of bytes that fits in one Sideband frame + MaxSidebandFrameSize = maxPktSize - 5 + maxPktSize = 65520 // https://gitlab.com/gitlab-org/git/-/blob/v2.30.0/pkt-line.h#L216 + pktDelim = "0001" ) // NewScanner returns a bufio.Scanner that splits on Git pktline boundaries @@ -112,8 +114,9 @@ func pktLineSplitter(data []byte, atEOF bool) (advance int, token []byte, err er // SidebandWriter multiplexes byte streams into a single side-band-64k stream. type SidebandWriter struct { - w io.Writer - m sync.Mutex + w io.Writer + m sync.Mutex + buf [maxPktSize]byte } // NewSidebandWriter instantiates a new SidebandWriter. @@ -125,17 +128,12 @@ func (sw *SidebandWriter) writeBand(band byte, data []byte) (int, error) { n := 0 for len(data) > 0 { - chunkSize := len(data) const headerSize = 5 - if max := maxPktSize - headerSize; chunkSize > max { - chunkSize = max - } - - if _, err := fmt.Fprintf(sw.w, "%04x%s", chunkSize+headerSize, []byte{band}); err != nil { - return n, err - } + chunkSize := copy(sw.buf[headerSize:], data) + totalSize := chunkSize + headerSize + copy(sw.buf[:headerSize], fmt.Sprintf("%04x%s", totalSize, []byte{band})) - if _, err := sw.w.Write(data[:chunkSize]); err != nil { + if _, err := sw.w.Write(sw.buf[:totalSize]); err != nil { return n, err } data = data[chunkSize:] diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 783749dbb0e31e1d73f9739fa94131fcc90cbd48..5b66f52094b62a89eba7cdf66ef9bede873f7a74 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -1,6 +1,7 @@ package hook import ( + "bufio" "bytes" "context" "crypto/sha256" @@ -177,9 +178,24 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb defer stdin.Close() sw := pktline.NewSidebandWriter(w) - stdout := &countingWriter{W: sw.Writer(bandStdout)} - stderrBuf := &bytes.Buffer{} - stderr := &countingWriter{W: io.MultiWriter(sw.Writer(bandStderr), stderrBuf)} + + // Using buffered IO makes the write end of the pipe more efficient + // (fewer syscalls), but more importantly it also makes the read end more + // efficient: each sideband packet we write becomes a gRPC message send + // on the read end, and typically each message send is a syscall. + // Syscalls saved on the read end are multiplied by the number of cache + // hits. + stdoutBufferedWriter := bufio.NewWriterSize( + sw.Writer(bandStdout), + pktline.MaxSidebandFrameSize, + ) + stdout := &countingWriter{W: stdoutBufferedWriter} + + // We intentionally do not buffer stderr. This is because + // git-pack-objects uses stderr to send keepalives, and those should be + // transmitted without delay. + stderrSpy := &bytes.Buffer{} + stderr := &countingWriter{W: io.MultiWriter(sw.Writer(bandStderr), stderrSpy)} defer func() { generatedBytes := stdout.N + stderr.N @@ -201,10 +217,10 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb } if err := cmd.Wait(); err != nil { - return fmt.Errorf("git-pack-objects: stderr: %q err: %w", stderrBuf.String(), err) + return fmt.Errorf("git-pack-objects: stderr: %q err: %w", stderrSpy.String(), err) } - return nil + return stdoutBufferedWriter.Flush() } var ( diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index 1d77d5ca801c47f20aa5effb2a08c4a1fd6c6a86..d60dee70a9f748259097a50c51b5cfb1b9f8f7a6 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -1,6 +1,8 @@ package smarthttp import ( + "bufio" + "context" "crypto/sha1" "fmt" "io" @@ -10,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/stats" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/inspect" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" "google.golang.org/grpc/codes" @@ -36,29 +39,29 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS return resp.GetData(), err }), h) - pr, pw := io.Pipe() - defer pw.Close() - stdin := io.TeeReader(stdinReader, pw) - statsCh := make(chan stats.PackfileNegotiation, 1) - go func() { - defer close(statsCh) - - stats, err := stats.ParsePackfileNegotiation(pr) - if err != nil { - ctxlogrus.Extract(stream.Context()).WithError(err).Debug("failed parsing packfile negotiation") - return - } - stats.UpdateMetrics(s.packfileNegotiationMetrics) - - statsCh <- stats - }() + stdin, collector := s.wrapStatsCollector(stream.Context(), stdinReader) + defer collector.stats() var respBytes int64 - stdoutWriter := streamio.NewWriter(func(p []byte) error { - respBytes += int64(len(p)) - return stream.Send(&gitalypb.PostUploadPackResponse{Data: p}) - }) + stdoutWriter := helper.NewUnbufferedStartWriter( + bufio.NewWriterSize( + streamio.NewWriter(func(p []byte) error { + respBytes += int64(len(p)) + return stream.Send(&gitalypb.PostUploadPackResponse{Data: p}) + }), + streamio.WriteBufferSize, + ), + // Git's progress messages "Enumerating objects" etc act as keepalives so + // they should not be delayed by buffering. This number of bytes should + // be large enough to hold the combined progress messages. + 32*1024, + ) + defer func() { + // In case of an early return, the output stream may contain messages for + // the user so we should still flush it. + _ = stdoutWriter.Flush() + }() // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519 // needs to be removed after we get some statistics on this @@ -96,8 +99,7 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS } if err := cmd.Wait(); err != nil { - pw.Close() // ensure PackfileNegotiation parser returns - stats := <-statsCh + stats := collector.stats() if _, ok := command.ExitStatus(err); ok && stats.Deepen != "" { // We have seen a 'deepen' message in the request. It is expected that @@ -109,8 +111,9 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS return status.Errorf(codes.Unavailable, "PostUploadPack: %v", err) } - pw.Close() // Ensure PackfileNegotiation parser returns - <-statsCh // Wait for the packfile negotiation parser to finish. + if err := stdoutWriter.Flush(); err != nil { + return status.Errorf(codes.Unavailable, "PostUploadPack: %v", err) + } ctxlogrus.Extract(ctx).WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).Info("request details") @@ -124,3 +127,36 @@ func validateUploadPackRequest(req *gitalypb.PostUploadPackRequest) error { return nil } + +type statsCollector struct { + c io.Closer + statsCh chan stats.PackfileNegotiation +} + +func (sc *statsCollector) stats() stats.PackfileNegotiation { + sc.c.Close() + return <-sc.statsCh +} + +func (s *server) wrapStatsCollector(ctx context.Context, r io.Reader) (io.Reader, *statsCollector) { + pr, pw := io.Pipe() + sc := &statsCollector{ + c: pw, + statsCh: make(chan stats.PackfileNegotiation, 1), + } + + go func() { + defer close(sc.statsCh) + + stats, err := stats.ParsePackfileNegotiation(pr) + if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Debug("failed parsing packfile negotiation") + return + } + stats.UpdateMetrics(s.packfileNegotiationMetrics) + + sc.statsCh <- stats + }() + + return io.TeeReader(r, pw), sc +} diff --git a/internal/helper/unbuffered_start.go b/internal/helper/unbuffered_start.go new file mode 100644 index 0000000000000000000000000000000000000000..b82b1f8efc55a2859c1898a8fdcf7fff8841b080 --- /dev/null +++ b/internal/helper/unbuffered_start.go @@ -0,0 +1,32 @@ +package helper + +import ( + "bufio" +) + +// UnbufferedStartWriter wraps a *bufio.Writer so that early writes flush after each write. +type UnbufferedStartWriter struct { + w *bufio.Writer + slowStart int64 + n int64 +} + +// NewUnbufferedStartWriter returns a new UnbufferedStartWriter. Early writes +// automatically call Flush on the underlying writer. Once the total +// number of bytes written exceeds slowStart, the automatic flushing +// stops. Callers should always call Flush when they are done. +func NewUnbufferedStartWriter(w *bufio.Writer, slowStart int64) *UnbufferedStartWriter { + return &UnbufferedStartWriter{w: w, slowStart: slowStart} +} + +func (ssw *UnbufferedStartWriter) Write(p []byte) (int, error) { + n, err := ssw.w.Write(p) + ssw.n += int64(n) + if err == nil && ssw.n <= ssw.slowStart { + err = ssw.Flush() + } + return n, err +} + +// Flush flushes the underlying bufio.Writer of ssw. +func (ssw *UnbufferedStartWriter) Flush() error { return ssw.w.Flush() } diff --git a/internal/helper/unbuffered_start_test.go b/internal/helper/unbuffered_start_test.go new file mode 100644 index 0000000000000000000000000000000000000000..70194c7e2e52425f4ce854e4772d73c0a6bedd63 --- /dev/null +++ b/internal/helper/unbuffered_start_test.go @@ -0,0 +1,31 @@ +package helper + +import ( + "bufio" + "testing" + + "github.com/stretchr/testify/require" +) + +type writeLogger struct { + writes []string +} + +func (wl *writeLogger) Write(p []byte) (int, error) { + wl.writes = append(wl.writes, string(p)) + return len(p), nil +} + +func TestUnbufferedStartWriter(t *testing.T) { + wl := &writeLogger{} + ssw := NewUnbufferedStartWriter(bufio.NewWriter(wl), 5) + + for _, c := range []byte("hello world") { + n, err := ssw.Write([]byte{c}) + require.NoError(t, err) + require.Equal(t, 1, n) + } + require.NoError(t, ssw.Flush()) + + require.Equal(t, []string{"h", "e", "l", "l", "o", " world"}, wl.writes) +}