From 0205889113209f0fd704cb6d9d4f0f5a6ffb2826 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Fri, 16 Jul 2021 16:05:31 +0000 Subject: [PATCH 1/2] Add pktline.SingleBandReader This is helps us receive a stdin stream over a socket, including stdin EOF, while the socket remains open. EOF is signalled by a Git flush packet "0000". --- internal/git/pktline/pkt_line_test.go | 59 +++++++++++++++++++++++++++ internal/git/pktline/pktline.go | 41 +++++++++++++++++++ 2 files changed, 100 insertions(+) diff --git a/internal/git/pktline/pkt_line_test.go b/internal/git/pktline/pkt_line_test.go index 32694a7e0ab..fb68062f26d 100644 --- a/internal/git/pktline/pkt_line_test.go +++ b/internal/git/pktline/pkt_line_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "io" + "io/ioutil" "math" "math/rand" "strings" @@ -334,3 +335,61 @@ func TestEachSidebandPacket(t *testing.T) { }) } } + +func TestSingleBandReader(t *testing.T) { + testCases := []struct { + desc string + in string + out string + err error + }{ + { + desc: "empty", + in: "0000", + out: "", + }, + { + desc: "valid stream", + in: "0008\x00foo0008\x00bar0008\x00qux0008\x00baz0000", + out: "foobarquxbaz", + }, + { + desc: "valid stream trailing garbage", + in: "0008\x00foo0008\x00bar0008\x00qux0008\x00baz0000 garbage!!", + out: "foobarquxbaz", + }, + { + desc: "valid stream except missing flush", + in: "0008\x00foo0008\x00bar0008\x00qux0008\x00baz", + err: io.ErrUnexpectedEOF, + }, + { + desc: "interrupted stream", + in: "ffff\x00hello world!!", + err: io.ErrUnexpectedEOF, + }, + { + desc: "stream without band", + in: "0004", + err: &errNotSideband{pkt: "0004"}, + }, + { + desc: "stream with wrong band", + in: "0005\x01", + err: errUnexpectedSideband(1), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + out, err := ioutil.ReadAll(SingleBandReader(strings.NewReader(tc.in), 0)) + if tc.err != nil { + require.Equal(t, tc.err, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc.out, string(out)) + }) + } +} diff --git a/internal/git/pktline/pktline.go b/internal/git/pktline/pktline.go index 665d6cbde64..6ba3e24b7b2 100644 --- a/internal/git/pktline/pktline.go +++ b/internal/git/pktline/pktline.go @@ -10,6 +10,8 @@ import ( "io" "strconv" "sync" + + "gitlab.com/gitlab-org/gitaly/v14/streamio" ) const ( @@ -179,3 +181,42 @@ func EachSidebandPacket(r io.Reader, fn func(byte, []byte) error) error { return scanner.Err() } + +// SingleBandReader unwraps a flush-terminated sideband-64k stream. It +// expects a sequence of sideband packets all for the same band. The +// returned reader will return EOF when it encounters a flush packet. +// Anything else in the input stream will result in a read error. +func SingleBandReader(r io.Reader, band byte) io.Reader { + scanner := NewScanner(r) + + return streamio.NewReader(func() ([]byte, error) { + if !scanner.Scan() { + if err := scanner.Err(); err != nil { + return nil, err + } + return nil, io.ErrUnexpectedEOF + } + + data := scanner.Bytes() + + if IsFlush(data) { + return nil, io.EOF + } + + if len(data) < 5 { + return nil, &errNotSideband{string(data)} + } + + if b := data[4]; b != band { + return nil, errUnexpectedSideband(b) + } + + return data[5:], nil + }) +} + +type errUnexpectedSideband byte + +func (b errUnexpectedSideband) Error() string { + return fmt.Sprintf("unexpected band: %d", b) +} -- GitLab From 0b050e2a691bdf55974fbae13c7afccaabbd9213 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Tue, 29 Jun 2021 16:24:36 +0000 Subject: [PATCH 2/2] Require trailing flush in pktline.EachSidebandPacket This changes the behavior of pktline.EachSidebandPacket to expect a trailing flush packet. The old behavior was to expect the stream to end on a packet boundary. This is part of https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/463, where we need to be able to signal "end of stream" in a natural way and we cannot use EOF. There is only one call site for EachSidebandPacket, and that call site (PackObjectsHook) only consumes byte streams produced by the same Gitaly process. In other words, this change is safe in spite of being incompatible with the old behavior, because it can never be exposed to the old behavior. Changelog: other --- internal/git/pktline/pktline.go | 16 ++++++++++++---- .../{pkt_line_test.go => pktline_test.go} | 14 +++++++++++++- internal/gitaly/service/hook/pack_objects.go | 4 ++++ 3 files changed, 29 insertions(+), 5 deletions(-) rename internal/git/pktline/{pkt_line_test.go => pktline_test.go} (95%) diff --git a/internal/git/pktline/pktline.go b/internal/git/pktline/pktline.go index 6ba3e24b7b2..532fba8577b 100644 --- a/internal/git/pktline/pktline.go +++ b/internal/git/pktline/pktline.go @@ -163,13 +163,17 @@ type errNotSideband struct{ pkt string } func (err *errNotSideband) Error() string { return fmt.Sprintf("invalid sideband packet: %q", err.pkt) } -// EachSidebandPacket iterates over a side-band-64k pktline stream. For -// each packet, it will call fn with the band ID and the packet. Fn must -// not retain the packet. +// EachSidebandPacket iterates over a side-band-64k pktline stream until +// it reaches a flush packet. For each packet, it will call fn with the +// band ID and the packet. Fn must not retain the packet. func EachSidebandPacket(r io.Reader, fn func(byte, []byte) error) error { scanner := NewScanner(r) for scanner.Scan() { + if IsFlush(scanner.Bytes()) { + return nil + } + data := Data(scanner.Bytes()) if len(data) == 0 { return &errNotSideband{scanner.Text()} @@ -179,7 +183,11 @@ func EachSidebandPacket(r io.Reader, fn func(byte, []byte) error) error { } } - return scanner.Err() + if err := scanner.Err(); err != nil { + return err + } + + return io.ErrUnexpectedEOF } // SingleBandReader unwraps a flush-terminated sideband-64k stream. It diff --git a/internal/git/pktline/pkt_line_test.go b/internal/git/pktline/pktline_test.go similarity index 95% rename from internal/git/pktline/pkt_line_test.go rename to internal/git/pktline/pktline_test.go index fb68062f26d..dd26ab875ce 100644 --- a/internal/git/pktline/pkt_line_test.go +++ b/internal/git/pktline/pktline_test.go @@ -280,16 +280,23 @@ func TestEachSidebandPacket(t *testing.T) { }{ { desc: "empty", + in: "0000", out: map[byte]string{}, }, { desc: "empty with failing callback: callback does not run", + in: "0000", out: map[byte]string{}, callback: func(byte, []byte) error { panic("oh no") }, }, { desc: "valid stream", - in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz", + in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz0000", + out: map[byte]string{0: "foo", 1: "bar", 254: "qux", 255: "baz"}, + }, + { + desc: "valid stream trailing garbage", + in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz0000 garbage!!", out: map[byte]string{0: "foo", 1: "bar", 254: "qux", 255: "baz"}, }, { @@ -298,6 +305,11 @@ func TestEachSidebandPacket(t *testing.T) { callback: func(byte, []byte) error { return callbackError }, err: callbackError, }, + { + desc: "valid stream except missing flush", + in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz", + err: io.ErrUnexpectedEOF, + }, { desc: "interrupted stream", in: "ffff\x10hello world!!", diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 82c396503b4..f1be31c1d18 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -204,6 +204,10 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb return fmt.Errorf("git-pack-objects: stderr: %q err: %w", stderrBuf.String(), err) } + if err := pktline.WriteFlush(w); err != nil { + return err + } + return nil } -- GitLab