From be4f425ab2fe0c745af3fc4fc795b5fe1653954d Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Fri, 16 Apr 2021 20:13:45 +0200 Subject: [PATCH 1/5] Optimize number of writes in pktline.SidebandWriter Reduces the number of syscalls per sideband write from 2 to 1. Exports the maximum frame size that will result in one syscall. --- internal/git/pktline/pkt_line_test.go | 35 +++++++++++++++++++++++++++ internal/git/pktline/pktline.go | 24 +++++++++--------- 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/internal/git/pktline/pkt_line_test.go b/internal/git/pktline/pkt_line_test.go index 32694a7e0ab..dc262634f03 100644 --- a/internal/git/pktline/pkt_line_test.go +++ b/internal/git/pktline/pkt_line_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "io" + "io/ioutil" "math" "math/rand" "strings" @@ -334,3 +335,37 @@ func TestEachSidebandPacket(t *testing.T) { }) } } + +type writeCounter struct { + W io.Writer + N int +} + +func (wc *writeCounter) Write(p []byte) (int, error) { + wc.N++ + return wc.W.Write(p) +} + +func TestSidebandWriter_MaxSidebandFrameSize(t *testing.T) { + testCases := []struct { + desc string + size int + writes int + }{ + {desc: "maximum frame size", size: MaxSidebandFrameSize, writes: 1}, + {desc: "just over maximum frame size", size: MaxSidebandFrameSize + 1, writes: 2}, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + w := &writeCounter{W: ioutil.Discard} + sw := NewSidebandWriter(w) + + in := strings.Repeat("x", tc.size) + n, err := io.WriteString(sw.Writer(0), in) + require.NoError(t, err) + require.Equal(t, tc.size, n) + require.Equal(t, tc.writes, w.N) + }) + } +} diff --git a/internal/git/pktline/pktline.go b/internal/git/pktline/pktline.go index 665d6cbde64..c0f02d51328 100644 --- a/internal/git/pktline/pktline.go +++ b/internal/git/pktline/pktline.go @@ -13,8 +13,10 @@ import ( ) const ( - maxPktSize = 65520 // https://gitlab.com/gitlab-org/git/-/blob/v2.30.0/pkt-line.h#L216 - pktDelim = "0001" + // MaxSidebandFrameSize is the maximum number of bytes that fits in one Sideband frame + MaxSidebandFrameSize = maxPktSize - 5 + maxPktSize = 65520 // https://gitlab.com/gitlab-org/git/-/blob/v2.30.0/pkt-line.h#L216 + pktDelim = "0001" ) // NewScanner returns a bufio.Scanner that splits on Git pktline boundaries @@ -112,8 +114,9 @@ func pktLineSplitter(data []byte, atEOF bool) (advance int, token []byte, err er // SidebandWriter multiplexes byte streams into a single side-band-64k stream. type SidebandWriter struct { - w io.Writer - m sync.Mutex + w io.Writer + m sync.Mutex + buf [maxPktSize]byte } // NewSidebandWriter instantiates a new SidebandWriter. @@ -125,17 +128,12 @@ func (sw *SidebandWriter) writeBand(band byte, data []byte) (int, error) { n := 0 for len(data) > 0 { - chunkSize := len(data) const headerSize = 5 - if max := maxPktSize - headerSize; chunkSize > max { - chunkSize = max - } - - if _, err := fmt.Fprintf(sw.w, "%04x%s", chunkSize+headerSize, []byte{band}); err != nil { - return n, err - } + chunkSize := copy(sw.buf[headerSize:], data) + totalSize := chunkSize + headerSize + copy(sw.buf[:headerSize], fmt.Sprintf("%04x%s", totalSize, []byte{band})) - if _, err := sw.w.Write(data[:chunkSize]); err != nil { + if _, err := sw.w.Write(sw.buf[:totalSize]); err != nil { return n, err } data = data[chunkSize:] -- GitLab From 6c36165fcbf54316431c47562a699f1096f73d0f Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Fri, 16 Apr 2021 20:26:25 +0200 Subject: [PATCH 2/5] Minimize writes in PackObjectsHook This minimizes the number of write syscalls we make in PackObjectsHook. --- internal/gitaly/service/hook/pack_objects.go | 26 ++++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 783749dbb0e..5b66f52094b 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -1,6 +1,7 @@ package hook import ( + "bufio" "bytes" "context" "crypto/sha256" @@ -177,9 +178,24 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb defer stdin.Close() sw := pktline.NewSidebandWriter(w) - stdout := &countingWriter{W: sw.Writer(bandStdout)} - stderrBuf := &bytes.Buffer{} - stderr := &countingWriter{W: io.MultiWriter(sw.Writer(bandStderr), stderrBuf)} + + // Using buffered IO makes the write end of the pipe more efficient + // (fewer syscalls), but more importantly it also makes the read end more + // efficient: each sideband packet we write becomes a gRPC message send + // on the read end, and typically each message send is a syscall. + // Syscalls saved on the read end are multiplied by the number of cache + // hits. + stdoutBufferedWriter := bufio.NewWriterSize( + sw.Writer(bandStdout), + pktline.MaxSidebandFrameSize, + ) + stdout := &countingWriter{W: stdoutBufferedWriter} + + // We intentionally do not buffer stderr. This is because + // git-pack-objects uses stderr to send keepalives, and those should be + // transmitted without delay. + stderrSpy := &bytes.Buffer{} + stderr := &countingWriter{W: io.MultiWriter(sw.Writer(bandStderr), stderrSpy)} defer func() { generatedBytes := stdout.N + stderr.N @@ -201,10 +217,10 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb } if err := cmd.Wait(); err != nil { - return fmt.Errorf("git-pack-objects: stderr: %q err: %w", stderrBuf.String(), err) + return fmt.Errorf("git-pack-objects: stderr: %q err: %w", stderrSpy.String(), err) } - return nil + return stdoutBufferedWriter.Flush() } var ( -- GitLab From 895f5189915958dc9a3c71b1284621586d167a74 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Fri, 16 Apr 2021 20:49:37 +0200 Subject: [PATCH 3/5] Add helper.UnbufferedStartWriter This type is intended to be used with PostUploadPack, which starts slow with progress messages but then speeds up to where it can use buffering. --- internal/helper/unbuffered_start.go | 32 ++++++++++++++++++++++++ internal/helper/unbuffered_start_test.go | 31 +++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 internal/helper/unbuffered_start.go create mode 100644 internal/helper/unbuffered_start_test.go diff --git a/internal/helper/unbuffered_start.go b/internal/helper/unbuffered_start.go new file mode 100644 index 00000000000..b82b1f8efc5 --- /dev/null +++ b/internal/helper/unbuffered_start.go @@ -0,0 +1,32 @@ +package helper + +import ( + "bufio" +) + +// UnbufferedStartWriter wraps a *bufio.Writer so that early writes flush after each write. +type UnbufferedStartWriter struct { + w *bufio.Writer + slowStart int64 + n int64 +} + +// NewUnbufferedStartWriter returns a new UnbufferedStartWriter. Early writes +// automatically call Flush on the underlying writer. Once the total +// number of bytes written exceeds slowStart, the automatic flushing +// stops. Callers should always call Flush when they are done. +func NewUnbufferedStartWriter(w *bufio.Writer, slowStart int64) *UnbufferedStartWriter { + return &UnbufferedStartWriter{w: w, slowStart: slowStart} +} + +func (ssw *UnbufferedStartWriter) Write(p []byte) (int, error) { + n, err := ssw.w.Write(p) + ssw.n += int64(n) + if err == nil && ssw.n <= ssw.slowStart { + err = ssw.Flush() + } + return n, err +} + +// Flush flushes the underlying bufio.Writer of ssw. +func (ssw *UnbufferedStartWriter) Flush() error { return ssw.w.Flush() } diff --git a/internal/helper/unbuffered_start_test.go b/internal/helper/unbuffered_start_test.go new file mode 100644 index 00000000000..70194c7e2e5 --- /dev/null +++ b/internal/helper/unbuffered_start_test.go @@ -0,0 +1,31 @@ +package helper + +import ( + "bufio" + "testing" + + "github.com/stretchr/testify/require" +) + +type writeLogger struct { + writes []string +} + +func (wl *writeLogger) Write(p []byte) (int, error) { + wl.writes = append(wl.writes, string(p)) + return len(p), nil +} + +func TestUnbufferedStartWriter(t *testing.T) { + wl := &writeLogger{} + ssw := NewUnbufferedStartWriter(bufio.NewWriter(wl), 5) + + for _, c := range []byte("hello world") { + n, err := ssw.Write([]byte{c}) + require.NoError(t, err) + require.Equal(t, 1, n) + } + require.NoError(t, ssw.Flush()) + + require.Equal(t, []string{"h", "e", "l", "l", "o", " world"}, wl.writes) +} -- GitLab From e539f028a55aa7d151811026ff6f8bb5716be2a5 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Wed, 28 Apr 2021 11:28:43 +0200 Subject: [PATCH 4/5] Factor out stats collector cleanup In https://gitlab.com/gitlab-org/gitaly/-/issues/3280 we learned that it is important that the stats goroutine does not outlive the request handler. The best way to ensure that is to use a defer statement, so that we can be sure that we wait for the goroutine to finish on all return paths from the handler. This change moves the stats goroutine code around a bit so that we can have a single defer statement, rather than a combination of 'pw.Close(); <-statsCh' we have to repeat over and over. --- .../gitaly/service/smarthttp/upload_pack.go | 58 ++++++++++++------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index 1d77d5ca801..57b5ee501f1 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -1,6 +1,7 @@ package smarthttp import ( + "context" "crypto/sha1" "fmt" "io" @@ -36,22 +37,8 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS return resp.GetData(), err }), h) - pr, pw := io.Pipe() - defer pw.Close() - stdin := io.TeeReader(stdinReader, pw) - statsCh := make(chan stats.PackfileNegotiation, 1) - go func() { - defer close(statsCh) - - stats, err := stats.ParsePackfileNegotiation(pr) - if err != nil { - ctxlogrus.Extract(stream.Context()).WithError(err).Debug("failed parsing packfile negotiation") - return - } - stats.UpdateMetrics(s.packfileNegotiationMetrics) - - statsCh <- stats - }() + stdin, collector := s.wrapStatsCollector(stream.Context(), stdinReader) + defer collector.stats() var respBytes int64 @@ -96,8 +83,7 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS } if err := cmd.Wait(); err != nil { - pw.Close() // ensure PackfileNegotiation parser returns - stats := <-statsCh + stats := collector.stats() if _, ok := command.ExitStatus(err); ok && stats.Deepen != "" { // We have seen a 'deepen' message in the request. It is expected that @@ -109,9 +95,6 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS return status.Errorf(codes.Unavailable, "PostUploadPack: %v", err) } - pw.Close() // Ensure PackfileNegotiation parser returns - <-statsCh // Wait for the packfile negotiation parser to finish. - ctxlogrus.Extract(ctx).WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).Info("request details") return nil @@ -124,3 +107,36 @@ func validateUploadPackRequest(req *gitalypb.PostUploadPackRequest) error { return nil } + +type statsCollector struct { + c io.Closer + statsCh chan stats.PackfileNegotiation +} + +func (sc *statsCollector) stats() stats.PackfileNegotiation { + sc.c.Close() + return <-sc.statsCh +} + +func (s *server) wrapStatsCollector(ctx context.Context, r io.Reader) (io.Reader, *statsCollector) { + pr, pw := io.Pipe() + sc := &statsCollector{ + c: pw, + statsCh: make(chan stats.PackfileNegotiation, 1), + } + + go func() { + defer close(sc.statsCh) + + stats, err := stats.ParsePackfileNegotiation(pr) + if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Debug("failed parsing packfile negotiation") + return + } + stats.UpdateMetrics(s.packfileNegotiationMetrics) + + sc.statsCh <- stats + }() + + return io.TeeReader(r, pw), sc +} -- GitLab From 986f8b424e264fd2ff0d7375d618794aa29b10af Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Fri, 16 Apr 2021 21:20:09 +0200 Subject: [PATCH 5/5] Reduce gRPC message sends in PostUploadPack On gitlab.com, PostUploadPack has to transfer a lot of data. Git upload-pack does writes of 8KB and by default, each of these writes would be forwarded as a gRPC message send. Although grpc-go may coalesce sends into larger (32KB) socket writes, testing suggests it often does not do this so you end up with an unnecessarily high number of write syscalls. --- .../gitaly/service/smarthttp/upload_pack.go | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index 57b5ee501f1..d60dee70a9f 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -1,6 +1,7 @@ package smarthttp import ( + "bufio" "context" "crypto/sha1" "fmt" @@ -11,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/git/stats" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/inspect" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/streamio" "google.golang.org/grpc/codes" @@ -42,10 +44,24 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS var respBytes int64 - stdoutWriter := streamio.NewWriter(func(p []byte) error { - respBytes += int64(len(p)) - return stream.Send(&gitalypb.PostUploadPackResponse{Data: p}) - }) + stdoutWriter := helper.NewUnbufferedStartWriter( + bufio.NewWriterSize( + streamio.NewWriter(func(p []byte) error { + respBytes += int64(len(p)) + return stream.Send(&gitalypb.PostUploadPackResponse{Data: p}) + }), + streamio.WriteBufferSize, + ), + // Git's progress messages "Enumerating objects" etc act as keepalives so + // they should not be delayed by buffering. This number of bytes should + // be large enough to hold the combined progress messages. + 32*1024, + ) + defer func() { + // In case of an early return, the output stream may contain messages for + // the user so we should still flush it. + _ = stdoutWriter.Flush() + }() // TODO: it is first step of the https://gitlab.com/gitlab-org/gitaly/issues/1519 // needs to be removed after we get some statistics on this @@ -95,6 +111,10 @@ func (s *server) PostUploadPack(stream gitalypb.SmartHTTPService_PostUploadPackS return status.Errorf(codes.Unavailable, "PostUploadPack: %v", err) } + if err := stdoutWriter.Flush(); err != nil { + return status.Errorf(codes.Unavailable, "PostUploadPack: %v", err) + } + ctxlogrus.Extract(ctx).WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).Info("request details") return nil -- GitLab