diff --git a/cmd/gitaly-hooks/hooks.go b/cmd/gitaly-hooks/hooks.go index 03b33ddb8a65c4d1672f1461c1e53ff053434879..1b21a4d46cdc568b97c0ddbfb80b7691b022a8cf 100644 --- a/cmd/gitaly-hooks/hooks.go +++ b/cmd/gitaly-hooks/hooks.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "net" "os" "strings" @@ -13,6 +14,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" @@ -21,6 +23,7 @@ import ( gitalylog "gitlab.com/gitlab-org/gitaly/v14/internal/log" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/stream" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" "gitlab.com/gitlab-org/labkit/tracing" @@ -339,9 +342,17 @@ func packObjectsHook(ctx context.Context, payload git.HooksPayload, hookClient g fixedArgs = append(fixedArgs, fixFilterQuoteBug(a)) } - if err := handlePackObjects(ctx, hookClient, payload.Repo, fixedArgs); err != nil { - logger.Logger().WithFields(logrus.Fields{"args": args}).WithError(err).Error("PackObjectsHook RPC failed") - return 1, nil + switch os.Getenv("GITALY_HOOKS_PACK_OBJECTS_HOOK_STREAM") { + case "1": + if err := handlePackObjectsStream(ctx, payload, fixedArgs); err != nil { + logger.Logger().WithFields(logrus.Fields{"args": args}).WithError(err).Error("PackObjectsHookStream RPC failed") + return 1, nil + } + default: + if err := handlePackObjects(ctx, hookClient, payload.Repo, fixedArgs); err != nil { + logger.Logger().WithFields(logrus.Fields{"args": args}).WithError(err).Error("PackObjectsHook RPC failed") + return 1, nil + } } return 0, nil @@ -406,3 +417,46 @@ type nopExitStatus struct { } func (nopExitStatus) GetExitStatus() *gitalypb.ExitStatus { return nil } + +func handlePackObjectsStream(ctx context.Context, payload git.HooksPayload, args []string) error { + req := &gitalypb.PackObjectsHookStreamRequest{ + Repository: payload.Repo, + Args: args, + } + + callback := func(c net.Conn) error { + if _, err := io.Copy( + pktline.NewSidebandWriter(c).Writer(0), + os.Stdin, + ); err != nil { + return err + } + if err := pktline.WriteFlush(c); err != nil { + return err + } + + return pktline.EachSidebandPacket(c, func(band byte, data []byte) error { + var err error + switch band { + case 1: + _, err = os.Stdout.Write(data) + case 2: + _, err = os.Stderr.Write(data) + default: + err = fmt.Errorf("unexpected side band: %d", band) + } + return err + }) + } + + return streamrpc.Call( + ctx, + streamrpc.DialNet("unix://"+payload.InternalSocket), + "/gitaly.HookService/PackObjectsHookStream", + req, + callback, + streamrpc.WithCredentials( + gitalyauth.RPCCredentialsV2(payload.InternalSocketToken), + ), + ) +} diff --git a/cmd/gitaly-hooks/hooks_test.go b/cmd/gitaly-hooks/hooks_test.go index d0f219bc8b74f2c7cb1912e5c2cdd184a4ed31bd..c5ccdadce4b0d075eac1302bbb705acd9ad953fb 100644 --- a/cmd/gitaly-hooks/hooks_test.go +++ b/cmd/gitaly-hooks/hooks_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/command" @@ -33,6 +34,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" ) type glHookValues struct { @@ -567,8 +569,8 @@ func TestCheckBadCreds(t *testing.T) { require.Regexp(t, `Checking GitLab API access: .* level=error msg="Internal API error" .* error="authorization failed" method=GET status=401 url="http://127.0.0.1:[0-9]+/api/v4/internal/check"\nFAIL`, stdout.String()) } -func runHookServiceServer(t *testing.T, cfg config.Cfg) { - runHookServiceWithGitlabClient(t, cfg, gitlab.NewMockClient()) +func runHookServiceServer(t *testing.T, cfg config.Cfg, serverOpts ...testserver.GitalyServerOpt) { + runHookServiceWithGitlabClient(t, cfg, gitlab.NewMockClient(), serverOpts...) } type featureFlagAsserter struct { @@ -607,12 +609,18 @@ func (svc featureFlagAsserter) PackObjectsHook(stream gitalypb.HookService_PackO return svc.wrapped.PackObjectsHook(stream) } -func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client) { - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { +func (svc featureFlagAsserter) PackObjectsHookStream(ctx context.Context, req *gitalypb.PackObjectsHookStreamRequest) (*emptypb.Empty, error) { + svc.assertFlags(ctx) + return svc.wrapped.PackObjectsHookStream(ctx, req) +} + +func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client, serverOpts ...testserver.GitalyServerOpt) { + serverOpts = append(serverOpts, testserver.WithGitLabClient(gitlabClient)) + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, featureFlagAsserter{ t: t, wrapped: hook.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory()), }) - }, testserver.WithGitLabClient(gitlabClient)) + }, serverOpts...) } func requireContainsOnce(t *testing.T, s string, contains string) { @@ -666,25 +674,45 @@ func TestGitalyHooksPackObjects(t *testing.T) { testCases := []struct { desc string extraArgs []string + extraEnv []string + method string }{ - {desc: "regular clone"}, - {desc: "shallow clone", extraArgs: []string{"--depth=1"}}, - {desc: "partial clone", extraArgs: []string{"--filter=blob:none"}}, + {desc: "regular clone", method: "PackObjectsHook"}, + {desc: "shallow clone", extraArgs: []string{"--depth=1"}, method: "PackObjectsHook"}, + {desc: "partial clone", extraArgs: []string{"--filter=blob:none"}, method: "PackObjectsHook"}, + { + desc: "regular clone StreamRPC", + extraEnv: []string{"GITALY_HOOKS_PACK_OBJECTS_HOOK_STREAM=1"}, + method: "PackObjectsHookStream", + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - runHookServiceServer(t, cfg) + logger, hook := test.NewNullLogger() + runHookServiceServer(t, cfg, testserver.WithLogger(logger)) tempDir := testhelper.TempDir(t) args := append(baseArgs[1:], tc.extraArgs...) args = append(args, repoPath, tempDir) cmd := exec.Command(baseArgs[0], args...) - cmd.Env = env + cmd.Env = append(env, tc.extraEnv...) cmd.Stderr = os.Stderr require.NoError(t, cmd.Run()) + + foundMethod := false + for _, e := range hook.AllEntries() { + t.Log(e.Data) + if e.Data["grpc.service"] != "gitaly.HookService" { + continue + } + + require.Equal(t, tc.method, e.Data["grpc.method"]) + foundMethod = true + } + require.True(t, foundMethod) }) } } diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go index 777b0757d65c87b5b9cb426c7e33f3347f7156f0..cce031c4ca4dcc65e79a76e8ce014d34ba29c55e 100644 --- a/cmd/gitaly-ssh/auth_test.go +++ b/cmd/gitaly-ssh/auth_test.go @@ -23,6 +23,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -152,7 +153,8 @@ func runServer(t *testing.T, secure bool, cfg config.Cfg, connectionType string, hookManager := hook.NewManager(locator, txManager, gitlab.NewMockClient(), cfg) gitCmdFactory := git.NewExecCommandFactory(cfg) diskCache := cache.New(cfg, locator) - srv, err := server.New(secure, cfg, testhelper.DiscardTestEntry(t), registry, diskCache) + streamRPCServer := streamrpc.NewServer() + srv, err := server.New(secure, cfg, testhelper.DiscardTestEntry(t), registry, diskCache, streamRPCServer) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ Cfg: cfg, diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index dd424300376598f19ff28c8f0a9b7c887e1aacc8..be48a6c203ba49e4c778288bee559952e47a97b7 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -29,6 +29,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" glog "gitlab.com/gitlab-org/gitaly/v14/internal/log" "gitlab.com/gitlab-org/gitaly/v14/internal/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" "gitlab.com/gitlab-org/gitaly/v14/internal/version" "gitlab.com/gitlab-org/labkit/monitoring" @@ -196,6 +197,19 @@ func run(cfg config.Cfg) error { } defer rubySrv.Stop() + deps := &service.Dependencies{ + Cfg: cfg, + RubyServer: rubySrv, + GitalyHookManager: hookManager, + TransactionManager: transactionManager, + StorageLocator: locator, + ClientPool: conns, + GitCmdFactory: gitCmdFactory, + Linguist: ling, + CatfileCache: catfileCache, + DiskCache: diskCache, + } + for _, c := range []starter.Config{ {Name: starter.Unix, Addr: cfg.SocketPath, HandoverOnUpgrade: true}, {Name: starter.Unix, Addr: cfg.GitalyInternalSocketPath(), HandoverOnUpgrade: false}, @@ -206,32 +220,23 @@ func run(cfg config.Cfg) error { continue } - var srv *grpc.Server + var grpcSrv *grpc.Server + var srpcSrv *streamrpc.Server if c.HandoverOnUpgrade { - srv, err = gitalyServerFactory.CreateExternal(c.IsSecure()) + grpcSrv, srpcSrv, err = gitalyServerFactory.CreateExternal(c.IsSecure()) if err != nil { return fmt.Errorf("create external gRPC server: %w", err) } } else { - srv, err = gitalyServerFactory.CreateInternal() + grpcSrv, srpcSrv, err = gitalyServerFactory.CreateInternal() if err != nil { return fmt.Errorf("create internal gRPC server: %w", err) } } - setup.RegisterAll(srv, &service.Dependencies{ - Cfg: cfg, - RubyServer: rubySrv, - GitalyHookManager: hookManager, - TransactionManager: transactionManager, - StorageLocator: locator, - ClientPool: conns, - GitCmdFactory: gitCmdFactory, - Linguist: ling, - CatfileCache: catfileCache, - DiskCache: diskCache, - }) - b.RegisterStarter(starter.New(c, srv)) + setup.RegisterAll(grpcSrv, deps) + setup.RegisterAll(srpcSrv, deps) + b.RegisterStarter(starter.New(c, grpcSrv)) } if addr := cfg.PrometheusListenAddr; addr != "" { diff --git a/internal/git/hooks_options.go b/internal/git/hooks_options.go index 7ad3ff9f70a6d7cee6665d85f184b53711131a15..b8b2789046f4f5819f526ecdc60f407e9a2f5c00 100644 --- a/internal/git/hooks_options.go +++ b/internal/git/hooks_options.go @@ -66,6 +66,10 @@ func WithPackObjectsHookEnv(ctx context.Context, repo *gitalypb.Repository, cfg Value: filepath.Join(cfg.BinDir, "gitaly-hooks"), }) + if featureflag.PackObjectsHookStream.IsEnabled(ctx) { + cc.env = append(cc.env, "GITALY_HOOKS_PACK_OBJECTS_HOOK_STREAM=1") + } + return nil } } diff --git a/internal/git/pktline/pktline.go b/internal/git/pktline/pktline.go index 665d6cbde64d5a1d7e4843be2e7b5f2c5ed72e99..54cea3e81bbdab64fec0ad6fa269104f9d2d781b 100644 --- a/internal/git/pktline/pktline.go +++ b/internal/git/pktline/pktline.go @@ -10,6 +10,8 @@ import ( "io" "strconv" "sync" + + "gitlab.com/gitlab-org/gitaly/v14/streamio" ) const ( @@ -161,13 +163,17 @@ type errNotSideband struct{ pkt string } func (err *errNotSideband) Error() string { return fmt.Sprintf("invalid sideband packet: %q", err.pkt) } -// EachSidebandPacket iterates over a side-band-64k pktline stream. For -// each packet, it will call fn with the band ID and the packet. Fn must -// not retain the packet. +// EachSidebandPacket iterates over a side-band-64k pktline stream until +// it reaches a flush packet. For each packet, it will call fn with the +// band ID and the packet. Fn must not retain the packet. func EachSidebandPacket(r io.Reader, fn func(byte, []byte) error) error { scanner := NewScanner(r) for scanner.Scan() { + if IsFlush(scanner.Bytes()) { + return nil + } + data := Data(scanner.Bytes()) if len(data) == 0 { return &errNotSideband{scanner.Text()} @@ -177,5 +183,44 @@ func EachSidebandPacket(r io.Reader, fn func(byte, []byte) error) error { } } - return scanner.Err() + if err := scanner.Err(); err != nil { + return err + } + + return io.ErrUnexpectedEOF +} + +// SingleBandReader unwraps a flush-terminated sideband-64k stream. It +// expects a sequence of sideband packets all for the same band. The +// returned reader will return EOF when it encounters a flush packet. +// Anything else in the input stream will result in a read error. +func SingleBandReader(r io.Reader, band byte) io.Reader { + scanner := NewScanner(r) + + return streamio.NewReader(func() ([]byte, error) { + if !scanner.Scan() { + return nil, io.ErrUnexpectedEOF + } + data := scanner.Bytes() + + if IsFlush(data) { + return nil, io.EOF + } + + if len(data) < 5 { + return nil, &errNotSideband{string(data)} + } + + if b := data[4]; b != band { + return nil, errUnexpectedSideband(b) + } + + return data[5:], nil + }) +} + +type errUnexpectedSideband byte + +func (b errUnexpectedSideband) Error() string { + return fmt.Sprintf("unexpected band: %d", b) } diff --git a/internal/git/pktline/pkt_line_test.go b/internal/git/pktline/pktline_test.go similarity index 81% rename from internal/git/pktline/pkt_line_test.go rename to internal/git/pktline/pktline_test.go index 32694a7e0ab4eac1d122d50f873a0585fa2d9ae0..dd26ab875ce127a9632af6ec1ab3ed985e58a38b 100644 --- a/internal/git/pktline/pkt_line_test.go +++ b/internal/git/pktline/pktline_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "io" + "io/ioutil" "math" "math/rand" "strings" @@ -279,16 +280,23 @@ func TestEachSidebandPacket(t *testing.T) { }{ { desc: "empty", + in: "0000", out: map[byte]string{}, }, { desc: "empty with failing callback: callback does not run", + in: "0000", out: map[byte]string{}, callback: func(byte, []byte) error { panic("oh no") }, }, { desc: "valid stream", - in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz", + in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz0000", + out: map[byte]string{0: "foo", 1: "bar", 254: "qux", 255: "baz"}, + }, + { + desc: "valid stream trailing garbage", + in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz0000 garbage!!", out: map[byte]string{0: "foo", 1: "bar", 254: "qux", 255: "baz"}, }, { @@ -297,6 +305,11 @@ func TestEachSidebandPacket(t *testing.T) { callback: func(byte, []byte) error { return callbackError }, err: callbackError, }, + { + desc: "valid stream except missing flush", + in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz", + err: io.ErrUnexpectedEOF, + }, { desc: "interrupted stream", in: "ffff\x10hello world!!", @@ -334,3 +347,61 @@ func TestEachSidebandPacket(t *testing.T) { }) } } + +func TestSingleBandReader(t *testing.T) { + testCases := []struct { + desc string + in string + out string + err error + }{ + { + desc: "empty", + in: "0000", + out: "", + }, + { + desc: "valid stream", + in: "0008\x00foo0008\x00bar0008\x00qux0008\x00baz0000", + out: "foobarquxbaz", + }, + { + desc: "valid stream trailing garbage", + in: "0008\x00foo0008\x00bar0008\x00qux0008\x00baz0000 garbage!!", + out: "foobarquxbaz", + }, + { + desc: "valid stream except missing flush", + in: "0008\x00foo0008\x00bar0008\x00qux0008\x00baz", + err: io.ErrUnexpectedEOF, + }, + { + desc: "interrupted stream", + in: "ffff\x00hello world!!", + err: io.ErrUnexpectedEOF, + }, + { + desc: "stream without band", + in: "0004", + err: &errNotSideband{pkt: "0004"}, + }, + { + desc: "stream with wrong band", + in: "0005\x01", + err: errUnexpectedSideband(1), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + out, err := ioutil.ReadAll(SingleBandReader(strings.NewReader(tc.in), 0)) + if tc.err != nil { + require.Equal(t, tc.err, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc.out, string(out)) + }) + } +} diff --git a/internal/git/remoterepo/repository_test.go b/internal/git/remoterepo/repository_test.go index b74f88f75813c1df9ac28c645440cb83eac086c8..aff19797da35885729fb4e958e31e8bb2a292819 100644 --- a/internal/git/remoterepo/repository_test.go +++ b/internal/git/remoterepo/repository_test.go @@ -22,7 +22,7 @@ import ( func TestRepository(t *testing.T) { cfg := testcfg.Build(t) - serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/git/updateref/update_with_hooks_test.go b/internal/git/updateref/update_with_hooks_test.go index f5740c70c82721a17a1026b11dccfe0b5aed0f09..da0e40dd26d0128a588a6f0b00b442d218169b50 100644 --- a/internal/git/updateref/update_with_hooks_test.go +++ b/internal/git/updateref/update_with_hooks_test.go @@ -97,7 +97,7 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { // We need to set up a separate "real" hook service here, as it will be used in // git-update-ref(1) spawned by `updateRefWithHooks()` - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory())) }) diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 382d8fb931f7775670bb3bba709f87e526761b6e..0f642c1d5ac585b4a32cfd0989a793a4bd4a05e5 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -25,8 +25,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/teststream" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -86,30 +88,54 @@ func TestTLSSanity(t *testing.T) { func TestAuthFailures(t *testing.T) { testCases := []struct { - desc string - opts []grpc.DialOption - code codes.Code + desc string + creds credentials.PerRPCCredentials + code codes.Code + message string }{ - {desc: "no auth", opts: nil, code: codes.Unauthenticated}, { - desc: "invalid auth", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(brokenAuth{})}, - code: codes.Unauthenticated, + desc: "no auth", + creds: nil, + code: codes.Unauthenticated, + message: "rpc error: code = Unauthenticated desc = authentication required", }, { - desc: "wrong secret", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("foobar"))}, - code: codes.PermissionDenied, + desc: "invalid auth", + creds: brokenAuth{}, + code: codes.Unauthenticated, + message: "rpc error: code = Unauthenticated desc = authentication required", + }, + { + desc: "wrong secret", + creds: gitalyauth.RPCCredentialsV2("foobar"), + code: codes.PermissionDenied, + message: "rpc error: code = PermissionDenied desc = permission denied", }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - serverSocketPath := runServer(t, config.Cfg{Auth: auth.Config{Token: "quxbaz"}}) - connOpts := append(tc.opts, grpc.WithInsecure()) + cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{ + Auth: auth.Config{Token: "quxbaz"}, + })) + serverSocketPath := runServer(t, cfg) + + // Make a healthcheck gRPC call + connOpts := []grpc.DialOption{grpc.WithInsecure()} + if tc.creds != nil { + connOpts = append(connOpts, grpc.WithPerRPCCredentials(tc.creds)) + } conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) t.Cleanup(func() { conn.Close() }) testhelper.RequireGrpcError(t, healthCheck(conn), tc.code) + + // // Make a streamRPC call + var callOpts []streamrpc.CallOption + if tc.creds != nil { + callOpts = append(callOpts, streamrpc.WithCredentials(tc.creds)) + } + _, _, err = checkStreamRPC(t, streamrpc.DialNet(serverSocketPath), repo, callOpts...) + require.EqualError(t, err, tc.message) }) } } @@ -119,40 +145,54 @@ func TestAuthSuccess(t *testing.T) { testCases := []struct { desc string - opts []grpc.DialOption + creds credentials.PerRPCCredentials required bool token string }{ {desc: "no auth, not required"}, { desc: "v2 correct auth, not required", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))}, + creds: gitalyauth.RPCCredentialsV2(token), token: token, }, { desc: "v2 incorrect auth, not required", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("incorrect"))}, + creds: gitalyauth.RPCCredentialsV2("incorrect"), token: token, }, { desc: "v2 correct auth, required", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))}, + creds: gitalyauth.RPCCredentialsV2(token), token: token, required: true, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{ + cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{ Auth: auth.Config{Token: tc.token, Transitioning: !tc.required}, })) serverSocketPath := runServer(t, cfg) - connOpts := append(tc.opts, grpc.WithInsecure()) + + // Make a healthcheck gRPC call + connOpts := []grpc.DialOption{grpc.WithInsecure()} + if tc.creds != nil { + connOpts = append(connOpts, grpc.WithPerRPCCredentials(tc.creds)) + } conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) t.Cleanup(func() { conn.Close() }) assert.NoError(t, healthCheck(conn), tc.desc) + + // // Make a streamRPC call + var callOpts []streamrpc.CallOption + if tc.creds != nil { + callOpts = append(callOpts, streamrpc.WithCredentials(tc.creds)) + } + in, out, err := checkStreamRPC(t, streamrpc.DialNet(serverSocketPath), repo, callOpts...) + require.NoError(t, err) + require.Equal(t, in, out) }) } } @@ -201,8 +241,10 @@ func runServer(t *testing.T, cfg config.Cfg) string { gitCmdFactory := git.NewExecCommandFactory(cfg) catfileCache := catfile.NewCache(cfg) diskCache := cache.New(cfg, locator) + streamRPCServer := streamrpc.NewServer() + gitalypb.RegisterTestStreamServiceServer(streamRPCServer, teststream.NewServer(locator)) - srv, err := New(false, cfg, testhelper.DiscardTestEntry(t), registry, diskCache) + srv, err := New(false, cfg, testhelper.DiscardTestEntry(t), registry, diskCache, streamRPCServer) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ @@ -236,7 +278,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { conns := client.NewPool() t.Cleanup(func() { conns.Close() }) - srv, err := New(true, cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + srv, err := New(true, cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), streamrpc.NewServer()) require.NoError(t, err) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index ffdf55c10dd847dc9a5042b6dd4eca617a1b892b..82b48f958e35423f61c1cb997731522cd66b7171 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -28,6 +28,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" "google.golang.org/grpc" @@ -74,6 +75,7 @@ func New( logrusEntry *log.Entry, registry *backchannel.Registry, cacheInvalidator diskcache.Invalidator, + streamRPCServer *streamrpc.Server, ) (*grpc.Server, error) { ctxTagOpts := []grpcmwtags.Option{ grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), @@ -96,53 +98,63 @@ func New( }) } + serverStreamInterceptorChain := grpcmw.ChainStreamServer( + grpcmwtags.StreamServerInterceptor(ctxTagOpts...), + grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.StreamInterceptor, + grpcprometheus.StreamServerInterceptor, + commandstatshandler.StreamInterceptor, + grpcmwlogrus.StreamServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), + sentryhandler.StreamLogHandler, + cancelhandler.Stream, // Should be below LogHandler + auth.StreamServerInterceptor(cfg.Auth), + lh.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + grpctracing.StreamServerTracingInterceptor(), + cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.StreamPanicHandler, + ) + + serverUnaryInterceptorChain := grpcmw.ChainUnaryServer( + grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), + grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.UnaryInterceptor, + grpcprometheus.UnaryServerInterceptor, + commandstatshandler.UnaryInterceptor, + grpcmwlogrus.UnaryServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), + sentryhandler.UnaryLogHandler, + cancelhandler.Unary, // Should be below LogHandler + auth.UnaryServerInterceptor(cfg.Auth), + lh.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + grpctracing.UnaryServerTracingInterceptor(), + cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.UnaryPanicHandler, + ) + + streamRPCServer.UseInterceptor(serverUnaryInterceptorChain) + lm := listenmux.New(transportCredentials) lm.Register(backchannel.NewServerHandshaker( logrusEntry, registry, []grpc.DialOption{client.UnaryInterceptor()}, )) + lm.Register(streamrpc.NewServerHandshaker( + streamRPCServer, + gitalylog.Default(), + )) opts := []grpc.ServerOption{ grpc.Creds(lm), - grpc.StreamInterceptor(grpcmw.ChainStreamServer( - grpcmwtags.StreamServerInterceptor(ctxTagOpts...), - grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.StreamInterceptor, - grpcprometheus.StreamServerInterceptor, - commandstatshandler.StreamInterceptor, - grpcmwlogrus.StreamServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), - sentryhandler.StreamLogHandler, - cancelhandler.Stream, // Should be below LogHandler - auth.StreamServerInterceptor(cfg.Auth), - lh.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.StreamServerTracingInterceptor(), - cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.StreamPanicHandler, - )), - grpc.UnaryInterceptor(grpcmw.ChainUnaryServer( - grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), - grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.UnaryInterceptor, - grpcprometheus.UnaryServerInterceptor, - commandstatshandler.UnaryInterceptor, - grpcmwlogrus.UnaryServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), - sentryhandler.UnaryLogHandler, - cancelhandler.Unary, // Should be below LogHandler - auth.UnaryServerInterceptor(cfg.Auth), - lh.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.UnaryServerTracingInterceptor(), - cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.UnaryPanicHandler, - )), + grpc.StreamInterceptor(serverStreamInterceptorChain), + grpc.UnaryInterceptor(serverUnaryInterceptorChain), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 20 * time.Second, PermitWithoutStream: true, diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index 2b2d9e73dfdf5f3f67b4831d1bb9d52346180245..bae8fad63740e54e7c2db6bfa953ef8b138b7e1b 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/maintenance" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -133,26 +134,30 @@ func (s *GitalyServerFactory) GracefulStop() { } } -// CreateExternal creates a new external gRPC server. The external servers are closed +// CreateExternal creates a new external gRPC server and StreamRPC server. The external servers are closed // before the internal servers when gracefully shutting down. -func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) { - server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator) +func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, *streamrpc.Server, error) { + streamRPCServer := streamrpc.NewServer() + grpcServer, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator, streamRPCServer) if err != nil { - return nil, err + return nil, nil, err } - s.externalServers = append(s.externalServers, server) - return server, nil + s.externalServers = append(s.externalServers, grpcServer) + + return grpcServer, streamRPCServer, nil } -// CreateInternal creates a new internal gRPC server. Internal servers are closed +// CreateInternal creates a new internal gRPC server and StreamRPC server. Internal servers are closed // after the external ones when gracefully shutting down. -func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, error) { - server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator) +func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, *streamrpc.Server, error) { + streamRPCServer := streamrpc.NewServer() + grpcServer, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator, streamRPCServer) if err != nil { - return nil, err + return nil, nil, err } - s.internalServers = append(s.internalServers, server) - return server, nil + s.internalServers = append(s.internalServers, grpcServer) + + return grpcServer, streamRPCServer, nil } diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index c851c32d7c5f12d0d680125b2600656d277663c9..34c9c46c376cba590749e627a53ab084e74e26aa 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -1,10 +1,14 @@ package server import ( + "bytes" "context" "crypto/tls" "crypto/x509" "errors" + "io" + "io/ioutil" + "math/rand" "net" "os" "testing" @@ -16,8 +20,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/v14/internal/cache" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/teststream" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -30,71 +37,24 @@ func TestGitalyServerFactory(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - checkHealth := func(t *testing.T, sf *GitalyServerFactory, schema, addr string) healthpb.HealthClient { - t.Helper() - - var cc *grpc.ClientConn - if schema == starter.TLS { - listener, err := net.Listen(starter.TCP, addr) - require.NoError(t, err) - t.Cleanup(func() { listener.Close() }) - - srv, err := sf.CreateExternal(true) - require.NoError(t, err) - healthpb.RegisterHealthServer(srv, health.NewServer()) - go srv.Serve(listener) - - certPool, err := x509.SystemCertPool() - require.NoError(t, err) - - pem := testhelper.MustReadFile(t, sf.cfg.TLS.CertPath) - require.True(t, certPool.AppendCertsFromPEM(pem)) - - creds := credentials.NewTLS(&tls.Config{ - RootCAs: certPool, - MinVersion: tls.VersionTLS12, - }) - - cc, err = grpc.DialContext(ctx, listener.Addr().String(), grpc.WithTransportCredentials(creds)) - require.NoError(t, err) - } else { - listener, err := net.Listen(schema, addr) - require.NoError(t, err) - t.Cleanup(func() { listener.Close() }) - - srv, err := sf.CreateExternal(false) - require.NoError(t, err) - healthpb.RegisterHealthServer(srv, health.NewServer()) - go srv.Serve(listener) - - endpoint, err := starter.ComposeEndpoint(schema, listener.Addr().String()) - require.NoError(t, err) - - cc, err = client.Dial(endpoint, nil) - require.NoError(t, err) - } - - t.Cleanup(func() { cc.Close() }) - - healthClient := healthpb.NewHealthClient(cc) + t.Run("insecure over TCP", func(t *testing.T) { + cfg, repo, _ := testcfg.BuildWithRepo(t) + sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) - resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{}) - require.NoError(t, err) - require.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status) - return healthClient - } + check(t, ctx, sf, cfg, repo, starter.TCP, "localhost:0") + }) - t.Run("insecure", func(t *testing.T) { - cfg := testcfg.Build(t) + t.Run("insecure over Unix Socket", func(t *testing.T) { + cfg, repo, _ := testcfg.BuildWithRepo(t) sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) - checkHealth(t, sf, starter.TCP, "localhost:0") + check(t, ctx, sf, cfg, repo, starter.Unix, testhelper.GetTemporaryGitalySocketFileName(t)) }) t.Run("secure", func(t *testing.T) { certFile, keyFile := testhelper.GenerateCerts(t) - cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{TLS: config.TLS{ + cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{TLS: config.TLS{ CertPath: certFile, KeyPath: keyFile, }})) @@ -102,20 +62,20 @@ func TestGitalyServerFactory(t *testing.T) { sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) t.Cleanup(sf.Stop) - checkHealth(t, sf, starter.TLS, "localhost:0") + check(t, ctx, sf, cfg, repo, starter.TLS, "localhost:0") }) t.Run("all services must be stopped", func(t *testing.T) { - cfg := testcfg.Build(t) + cfg, repo, _ := testcfg.BuildWithRepo(t) sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) t.Cleanup(sf.Stop) - tcpHealthClient := checkHealth(t, sf, starter.TCP, "localhost:0") + tcpHealthClient := check(t, ctx, sf, cfg, repo, starter.TCP, "localhost:0") socket := testhelper.GetTemporaryGitalySocketFileName(t) t.Cleanup(func() { require.NoError(t, os.RemoveAll(socket)) }) - socketHealthClient := checkHealth(t, sf, starter.Unix, socket) + socketHealthClient := check(t, ctx, sf, cfg, repo, starter.Unix, socket) sf.GracefulStop() // stops all started servers(listeners) @@ -185,7 +145,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { }{ { createServer: func() *grpc.Server { - server, err := sf.CreateInternal() + server, _, err := sf.CreateInternal() require.NoError(t, err) return server }, @@ -195,7 +155,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { }, { createServer: func() *grpc.Server { - server, err := sf.CreateExternal(false) + server, _, err := sf.CreateExternal(false) require.NoError(t, err) return server }, @@ -287,3 +247,117 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { // wait until the graceful shutdown completes <-shutdownCompeleted } + +func check(t *testing.T, ctx context.Context, sf *GitalyServerFactory, cfg config.Cfg, repo *gitalypb.Repository, schema, addr string) healthpb.HealthClient { + t.Helper() + + var grpcConn *grpc.ClientConn + var streamRPCDial streamrpc.DialFunc + + if schema == starter.TLS { + listener, err := net.Listen(starter.TCP, addr) + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) + + grpcSrv, srpcSrv, err := sf.CreateExternal(true) + require.NoError(t, err) + healthpb.RegisterHealthServer(grpcSrv, health.NewServer()) + registerStreamRPCServers(t, srpcSrv, cfg) + go grpcSrv.Serve(listener) + + certPool, err := x509.SystemCertPool() + require.NoError(t, err) + + pem := testhelper.MustReadFile(t, sf.cfg.TLS.CertPath) + require.True(t, certPool.AppendCertsFromPEM(pem)) + + tlsConf := &tls.Config{ + RootCAs: certPool, + MinVersion: tls.VersionTLS12, + } + creds := credentials.NewTLS(tlsConf) + + streamRPCDial = streamrpc.DialTLS(listener.Addr().String(), tlsConf) + grpcConn, err = grpc.DialContext(ctx, listener.Addr().String(), grpc.WithTransportCredentials(creds)) + require.NoError(t, err) + } else { + listener, err := net.Listen(schema, addr) + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) + + grpcSrv, srpcSrv, err := sf.CreateExternal(false) + require.NoError(t, err) + healthpb.RegisterHealthServer(grpcSrv, health.NewServer()) + registerStreamRPCServers(t, srpcSrv, cfg) + go grpcSrv.Serve(listener) + + endpoint, err := starter.ComposeEndpoint(schema, listener.Addr().String()) + require.NoError(t, err) + + streamRPCDial = streamrpc.DialNet(endpoint) + grpcConn, err = client.Dial(endpoint, nil) + require.NoError(t, err) + } + + // Make a healthcheck gRPC call + t.Cleanup(func() { grpcConn.Close() }) + healthClient := healthpb.NewHealthClient(grpcConn) + + resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{}) + require.NoError(t, err) + require.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status) + + // Make a streamRPC call + in, out, err := checkStreamRPC(t, streamRPCDial, repo) + require.NoError(t, err) + require.Equal(t, in, out, "byte stream works") + + return healthClient +} + +func registerStreamRPCServers(t *testing.T, srv *streamrpc.Server, cfg config.Cfg) { + gitalypb.RegisterTestStreamServiceServer(srv, teststream.NewServer(config.NewLocator(cfg))) +} + +func checkStreamRPC(t *testing.T, dial streamrpc.DialFunc, repo *gitalypb.Repository, opts ...streamrpc.CallOption) ([]byte, []byte, error) { + ctx, cancel := testhelper.Context() + defer cancel() + + const size = 1024 * 1024 + + in := make([]byte, size) + _, err := rand.Read(in) + require.NoError(t, err) + + var out []byte + require.NotEqual(t, in, out) + + err = streamrpc.Call( + ctx, + dial, + "/gitaly.TestStreamService/TestStream", + &gitalypb.TestStreamRequest{ + Repository: repo, + Size: size, + }, + func(c net.Conn) error { + errC := make(chan error, 1) + go func() { + var err error + out, err = ioutil.ReadAll(c) + errC <- err + }() + + if _, err := io.Copy(c, bytes.NewReader(in)); err != nil { + return err + } + if err := <-errC; err != nil { + return err + } + + return nil + }, + opts..., + ) + return in, out, err +} diff --git a/internal/gitaly/service/blob/testhelper_test.go b/internal/gitaly/service/blob/testhelper_test.go index e7b64bd503b20d295ad589b655b634633301fdd2..a0de32432e9863910966706bddc0ed3852c710dd 100644 --- a/internal/gitaly/service/blob/testhelper_test.go +++ b/internal/gitaly/service/blob/testhelper_test.go @@ -34,7 +34,7 @@ func setup(t *testing.T) (config.Cfg, *gitalypb.Repository, string, gitalypb.Blo repo, repoPath, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name()) t.Cleanup(cleanup) - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterBlobServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/cleanup/testhelper_test.go b/internal/gitaly/service/cleanup/testhelper_test.go index d16a8bd4d645d5ff43151a84c3677deabe05fb43..8dc286effee28eb5885482984a8f43b5c4fb540c 100644 --- a/internal/gitaly/service/cleanup/testhelper_test.go +++ b/internal/gitaly/service/cleanup/testhelper_test.go @@ -37,7 +37,7 @@ func setupCleanupService(t *testing.T) (config.Cfg, *gitalypb.Repository, string } func runCleanupServiceServer(t *testing.T, cfg config.Cfg) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterCleanupServiceServer(srv, NewServer( deps.GetCfg(), deps.GetGitCmdFactory(), diff --git a/internal/gitaly/service/commit/testhelper_test.go b/internal/gitaly/service/commit/testhelper_test.go index 28eef0f7876ce7688931b24183c5145888c39e90..a28fb70ff12c25796f5d17d6c19092ab7aac7960 100644 --- a/internal/gitaly/service/commit/testhelper_test.go +++ b/internal/gitaly/service/commit/testhelper_test.go @@ -66,7 +66,7 @@ func setupCommitServiceCreateRepo( func startTestServices(t testing.TB, cfg config.Cfg) string { t.Helper() - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterCommitServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/conflicts/testhelper_test.go b/internal/gitaly/service/conflicts/testhelper_test.go index c8f14d617939742060711fd06075fbb60c3ee027..9b55622b243065ecc0e4a65df0c8d5223a925d23 100644 --- a/internal/gitaly/service/conflicts/testhelper_test.go +++ b/internal/gitaly/service/conflicts/testhelper_test.go @@ -84,7 +84,7 @@ func SetupConflictsService(t testing.TB, bare bool, hookManager hook.Manager) (c } func runConflictsServer(t testing.TB, cfg config.Cfg, hookManager hook.Manager) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterConflictsServiceServer(srv, NewServer( deps.GetCfg(), deps.GetHookManager(), diff --git a/internal/gitaly/service/diff/testhelper_test.go b/internal/gitaly/service/diff/testhelper_test.go index 1029cd3ed999b3a1ba80103df2091e9b297f3808..c787d35fc25429f22f90d7f5f5503192e2806988 100644 --- a/internal/gitaly/service/diff/testhelper_test.go +++ b/internal/gitaly/service/diff/testhelper_test.go @@ -27,7 +27,7 @@ func testMain(m *testing.M) int { func setupDiffService(t testing.TB, opt ...testserver.GitalyServerOpt) (config.Cfg, *gitalypb.Repository, string, gitalypb.DiffServiceClient) { cfg, repo, repoPath := testcfg.BuildWithRepo(t) - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterDiffServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 82c396503b46e013d5f63905e3f6e351546a2045..753c2f7c93992f67fd5b5d8a8b6580d5569d1330 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -14,6 +14,7 @@ import ( "strings" "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -21,8 +22,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" + "google.golang.org/protobuf/types/known/emptypb" ) var ( @@ -59,7 +62,32 @@ func (s *server) PackObjectsHook(stream gitalypb.HookService_PackObjectsHookServ return helper.ErrInvalidArgumentf("invalid pack-objects command: %v: %w", firstRequest.Args, err) } - if err := s.packObjectsHook(stream, firstRequest, args); err != nil { + stdin := streamio.NewReader(func() ([]byte, error) { + resp, err := stream.Recv() + return resp.GetStdin(), err + }) + + output := func(r io.Reader) (int64, error) { + var n int64 + err := pktline.EachSidebandPacket(r, func(band byte, data []byte) error { + resp := &gitalypb.PackObjectsHookResponse{} + + switch band { + case bandStdout: + resp.Stdout = data + case bandStderr: + resp.Stderr = data + default: + return fmt.Errorf("invalid side band: %d", band) + } + + n += int64(len(data)) + return stream.Send(resp) + }) + return n, err + } + + if err := s.packObjectsHook(stream.Context(), firstRequest.Repository, firstRequest, args, stdin, output); err != nil { return helper.ErrInternal(err) } @@ -67,19 +95,18 @@ func (s *server) PackObjectsHook(stream gitalypb.HookService_PackObjectsHookServ } const ( + bandStdin = 0 bandStdout = 1 bandStderr = 2 ) -func (s *server) packObjectsHook(stream gitalypb.HookService_PackObjectsHookServer, firstRequest *gitalypb.PackObjectsHookRequest, args *packObjectsArgs) error { - ctx := stream.Context() - +func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository, reqHash proto.Message, args *packObjectsArgs, stdinReader io.Reader, output func(io.Reader) (int64, error)) error { h := sha256.New() - if err := (&jsonpb.Marshaler{}).Marshal(h, firstRequest); err != nil { + if err := (&jsonpb.Marshaler{}).Marshal(h, reqHash); err != nil { return err } - stdin, err := bufferStdin(stream, h) + stdin, err := bufferStdin(stdinReader, h) if err != nil { return err } @@ -99,7 +126,7 @@ func (s *server) packObjectsHook(stream gitalypb.HookService_PackObjectsHookServ key := hex.EncodeToString(h.Sum(nil)) r, created, err := s.packObjectsCache.FindOrCreate(key, func(w io.Writer) error { - return s.runPackObjects(ctx, w, firstRequest.Repository, args, stdin, key) + return s.runPackObjects(ctx, w, repo, args, stdin, key) }) if err != nil { return err @@ -122,21 +149,8 @@ func (s *server) packObjectsHook(stream gitalypb.HookService_PackObjectsHookServ packObjectsServedBytes.Add(float64(servedBytes)) }() - if err := pktline.EachSidebandPacket(r, func(band byte, data []byte) error { - resp := &gitalypb.PackObjectsHookResponse{} - - switch band { - case bandStdout: - resp.Stdout = data - case bandStderr: - resp.Stderr = data - default: - return fmt.Errorf("invalid side band: %d", band) - } - - servedBytes += int64(len(data)) - return stream.Send(resp) - }); err != nil { + servedBytes, err = output(r) + if err != nil { return err } @@ -204,6 +218,10 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb return fmt.Errorf("git-pack-objects: stderr: %q err: %w", stderrBuf.String(), err) } + if err := pktline.WriteFlush(w); err != nil { + return err + } + return nil } @@ -274,7 +292,7 @@ func (p *packObjectsArgs) subcmd() git.SubCmd { return sc } -func bufferStdin(stream gitalypb.HookService_PackObjectsHookServer, h hash.Hash) (_ io.ReadCloser, err error) { +func bufferStdin(r io.Reader, h hash.Hash) (_ io.ReadCloser, err error) { f, err := ioutil.TempFile("", "PackObjectsHook-stdin") if err != nil { return nil, err @@ -289,15 +307,7 @@ func bufferStdin(stream gitalypb.HookService_PackObjectsHookServer, h hash.Hash) return nil, err } - stdin := io.TeeReader( - streamio.NewReader(func() ([]byte, error) { - resp, err := stream.Recv() - return resp.GetStdin(), err - }), - h, - ) - - _, err = io.Copy(f, stdin) + _, err = io.Copy(f, io.TeeReader(r, h)) if err != nil { return nil, err } @@ -319,3 +329,23 @@ func (cw *countingWriter) Write(p []byte) (int, error) { cw.N += int64(n) return n, err } + +func (s *server) PackObjectsHookStream(ctx context.Context, req *gitalypb.PackObjectsHookStreamRequest) (*emptypb.Empty, error) { + if req.GetRepository() == nil { + return nil, helper.ErrInvalidArgument(errors.New("repository is empty")) + } + + args, err := parsePackObjectsArgs(req.Args) + if err != nil { + return nil, helper.ErrInvalidArgumentf("invalid pack-objects command: %v: %w", req.Args, err) + } + + c, err := streamrpc.AcceptConnection(ctx) + if err != nil { + return nil, err + } + + stdin := pktline.SingleBandReader(c, bandStdin) + output := func(r io.Reader) (int64, error) { return io.Copy(c, r) } + return nil, s.packObjectsHook(ctx, req.Repository, req, args, stdin, output) +} diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index b6d3c987922779ce6dadd0bc8256acb40b32ade4..7fd0bbb76b9e127c3dceb5081d922f106ee6566d 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "net" "testing" "time" @@ -11,8 +12,10 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/streamcache" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -228,9 +231,12 @@ func TestServer_PackObjectsHook_separateContext(t *testing.T) { func TestServer_PackObjectsHook_usesCache(t *testing.T) { cfg, repo, repoPath := cfgWithCache(t) - tlc := &streamcache.TestLoggingCache{} + tlc := &streamcache.TestLoggingCache{Cache: streamcache.New( + cfg.PackObjectsCache.Dir, + cfg.PackObjectsCache.MaxAge.Duration(), + testhelper.DiscardTestLogger(t), + )} serverSocketPath := runHooksServer(t, cfg, []serverOption{func(s *server) { - tlc.Cache = s.packObjectsCache s.packObjectsCache = tlc }}) @@ -288,3 +294,193 @@ func TestServer_PackObjectsHook_usesCache(t *testing.T) { require.NoError(t, entries[i].Err) } } + +func TestServer_PackObjectsHookStream(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + cfg, repo, repoPath := cfgWithCache(t) + + testCases := []struct { + desc string + stdin string + args []string + }{ + { + desc: "clone 1 branch", + stdin: "3dd08961455abf80ef9115f4afdc1c6f968b503c\n--not\n\n", + args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}, + }, + { + desc: "shallow clone 1 branch", + stdin: "--shallow 1e292f8fedd741b75372e19097c76d327140c312\n1e292f8fedd741b75372e19097c76d327140c312\n--not\n\n", + args: []string{"--shallow-file", "", "pack-objects", "--revs", "--thin", "--stdout", "--shallow", "--progress", "--delta-base-offset", "--include-tag"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + logger, hook := test.NewNullLogger() + serverSocketPath := runHooksServer(t, cfg, nil, + testserver.WithLogger(logger), + testserver.WithDisablePraefect(), // TODO remove after https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1127 + ) + + var packets []string + require.NoError(t, streamrpc.Call( + ctx, + streamrpc.DialNet(serverSocketPath), + "/gitaly.HookService/PackObjectsHookStream", + &gitalypb.PackObjectsHookStreamRequest{ + Repository: repo, + Args: tc.args, + }, + func(c net.Conn) error { + sw := pktline.NewSidebandWriter(c) + if _, err := io.WriteString(sw.Writer(0), tc.stdin); err != nil { + return err + } + if err := pktline.WriteFlush(c); err != nil { + return err + } + + scanner := pktline.NewScanner(c) + for scanner.Scan() { + packets = append(packets, scanner.Text()) + } + return scanner.Err() + }, + )) + + require.NotEmpty(t, packets) + last := len(packets) - 1 + require.Equal(t, "0000", packets[last]) + + var packdata []byte + for _, pkt := range packets[:last] { + if len(pkt) < 5 { + t.Fatalf("invalid packet: %q", pkt) + } + + switch band := pkt[4]; band { + case 1: + packdata = append(packdata, pkt[5:]...) + case 2: + default: + t.Fatalf("unexpected band: %d", band) + } + } + + gittest.ExecStream( + t, + cfg, + bytes.NewReader(packdata), + "-C", repoPath, "index-pack", "--stdin", "--fix-thin", + ) + + for _, msg := range []string{"served bytes", "generated bytes"} { + t.Run(msg, func(t *testing.T) { + var entry *logrus.Entry + for _, e := range hook.AllEntries() { + if e.Message == msg { + entry = e + } + } + + require.NotNil(t, entry) + require.NotEmpty(t, entry.Data["cache_key"]) + require.Greater(t, entry.Data["bytes"], int64(0)) + }) + } + }) + } +} + +func TestServer_PackObjectsHookStream_errorSuppressesFlush(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + cfg, repo, _ := cfgWithCache(t) + serverSocketPath := runHooksServer(t, cfg, nil, + testserver.WithDisablePraefect(), // TODO remove after https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1127 + ) + + var packets []string + require.NoError(t, streamrpc.Call( + ctx, + streamrpc.DialNet(serverSocketPath), + "/gitaly.HookService/PackObjectsHookStream", + &gitalypb.PackObjectsHookStreamRequest{ + Repository: repo, + Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}, + }, + func(c net.Conn) error { + // Send input that will cause git pack-objects to fail + if _, err := pktline.WriteString(c, "\x00bad request"); err != nil { + return err + } + if err := pktline.WriteFlush(c); err != nil { + return err + } + + scanner := pktline.NewScanner(c) + for scanner.Scan() { + packets = append(packets, scanner.Text()) + } + return scanner.Err() + }, + )) + + // Because git pack-objects failed, there should be no flush packet. + require.NotEmpty(t, packets) + for _, pkt := range packets { + require.False(t, pktline.IsFlush([]byte(pkt))) + } +} + +func TestServer_PackObjectsHookStream_invalidArgument(t *testing.T) { + cfg, repo, _ := testcfg.BuildWithRepo(t) + serverSocketPath := runHooksServer(t, cfg, nil, + testserver.WithDisablePraefect(), // TODO remove after https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1127 + ) + + ctx, cancel := testhelper.Context() + defer cancel() + + testCases := []struct { + desc string + req *gitalypb.PackObjectsHookStreamRequest + msg string + }{ + { + desc: "empty", + req: &gitalypb.PackObjectsHookStreamRequest{}, + msg: "repository is empty", + }, + { + desc: "repo, no args", + req: &gitalypb.PackObjectsHookStreamRequest{Repository: repo}, + msg: "invalid pack-objects command", + }, + { + desc: "repo, bad args", + req: &gitalypb.PackObjectsHookStreamRequest{Repository: repo, Args: []string{"rm", "-rf"}}, + msg: "invalid pack-objects command", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + err := streamrpc.Call( + ctx, + streamrpc.DialNet(serverSocketPath), + "/gitaly.HookService/PackObjectsHookStream", + tc.req, + func(c net.Conn) error { panic("never reached") }, + ) + + require.Error(t, err) + require.Contains(t, err.Error(), tc.msg) + }) + } +} diff --git a/internal/gitaly/service/hook/testhelper_test.go b/internal/gitaly/service/hook/testhelper_test.go index dcbdd56e207f6cc548617c8fcf5c8a77d0a5a98a..cd2630bfd355ccfbabed8298d082f343efae9490 100644 --- a/internal/gitaly/service/hook/testhelper_test.go +++ b/internal/gitaly/service/hook/testhelper_test.go @@ -55,7 +55,7 @@ type serverOption func(*server) func runHooksServer(t testing.TB, cfg config.Cfg, opts []serverOption, serverOpts ...testserver.GitalyServerOpt) string { t.Helper() - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { hookServer := NewServer( deps.GetCfg(), gitalyhook.NewManager(deps.GetLocator(), deps.GetTxManager(), deps.GetGitlabClient(), deps.GetCfg()), diff --git a/internal/gitaly/service/internalgitaly/testhelper_test.go b/internal/gitaly/service/internalgitaly/testhelper_test.go index d768d2d80a93ee105b47c276349bac8fc46a1232..f092df9313a3b062ff8ac359084776e798b85d99 100644 --- a/internal/gitaly/service/internalgitaly/testhelper_test.go +++ b/internal/gitaly/service/internalgitaly/testhelper_test.go @@ -25,7 +25,7 @@ func testMain(m *testing.M) int { } func setupInternalGitalyService(t *testing.T, cfg config.Cfg, internalService gitalypb.InternalGitalyServer) gitalypb.InternalGitalyClient { - add := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + add := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterInternalGitalyServer(srv, internalService) }, testserver.WithDisablePraefect()) conn, err := grpc.Dial(add, grpc.WithInsecure()) diff --git a/internal/gitaly/service/namespace/testhelper_test.go b/internal/gitaly/service/namespace/testhelper_test.go index a37b5a03adc38b6daf7c69b380434d338d08c40c..a3851a41aa05b9356a5e60dcc4f56d5eba71e24a 100644 --- a/internal/gitaly/service/namespace/testhelper_test.go +++ b/internal/gitaly/service/namespace/testhelper_test.go @@ -17,7 +17,7 @@ func setupNamespaceService(t testing.TB, opts ...testserver.GitalyServerOpt) (co cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "other")) cfg := cfgBuilder.Build(t) - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterNamespaceServiceServer(srv, NewServer(deps.GetLocator())) }, opts...) diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go index 7777a638c3b784cda7ae9ab643270b61a35c9222..f1b70c22046b522c45d97cb7cc1e3a34dd512e6b 100644 --- a/internal/gitaly/service/objectpool/testhelper_test.go +++ b/internal/gitaly/service/objectpool/testhelper_test.go @@ -46,7 +46,7 @@ func setup(t *testing.T, opts ...testserver.GitalyServerOpt) (config.Cfg, *gital } func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator, logger *logrus.Logger, opts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterObjectPoolServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index 3f082d51737f986af8319ee1fe7b0fd336165195..ceea9984281e23837259e0056ca697180184ecd5 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -122,7 +122,7 @@ func TestUserCreateBranchWithTransaction(t *testing.T) { transactionServer := &testTransactionServer{} cfg.ListenAddr = "127.0.0.1:0" // runs gitaly on the TCP address - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), nil, @@ -483,7 +483,7 @@ func TestUserDeleteBranch_transaction(t *testing.T) { transactionServer := &testTransactionServer{} - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), nil, diff --git a/internal/gitaly/service/operations/tags_test.go b/internal/gitaly/service/operations/tags_test.go index 6b8adc277d3cf3496f358bc9043da5bb56dea751..3cc6b55473cc965e2bc502f220e66b44efe1faf2 100644 --- a/internal/gitaly/service/operations/tags_test.go +++ b/internal/gitaly/service/operations/tags_test.go @@ -275,7 +275,7 @@ func TestUserCreateTagWithTransaction(t *testing.T) { // runOperationServiceServer as it puts a Praefect server in between if // running Praefect tests, which would break our test setup. transactionServer := &testTransactionServer{} - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), nil, diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go index cda7418f360a57e187d2c4dd3780672982422ed4..36fcb8124c2963c0f0e3cf6d8486f3d41b8880c7 100644 --- a/internal/gitaly/service/operations/testhelper_test.go +++ b/internal/gitaly/service/operations/testhelper_test.go @@ -103,7 +103,7 @@ func setupOperationsServiceWithRuby( func runOperationServiceServer(t testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server, options ...testserver.GitalyServerOpt) string { t.Helper() - return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/ref/delete_refs_test.go b/internal/gitaly/service/ref/delete_refs_test.go index ba28bf018a80f48393505f4c8bce161c1d0b0581..677e81d3e52f8b730010acecee2706b3cdf75f69 100644 --- a/internal/gitaly/service/ref/delete_refs_test.go +++ b/internal/gitaly/service/ref/delete_refs_test.go @@ -91,7 +91,7 @@ func TestDeleteRefs_transaction(t *testing.T) { }, } - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRefServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/ref/testhelper_test.go b/internal/gitaly/service/ref/testhelper_test.go index 55683f0957cc2739cf092887935a66b0f25461b3..eda08dc8bf3737f6ba56c2cf6ba46fb2c7133c31 100644 --- a/internal/gitaly/service/ref/testhelper_test.go +++ b/internal/gitaly/service/ref/testhelper_test.go @@ -70,7 +70,7 @@ func setupRefServiceWithoutRepo(t testing.TB) (config.Cfg, gitalypb.RefServiceCl } func runRefServiceServer(t testing.TB, cfg config.Cfg) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRefServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/remote/fetch_internal_remote_test.go b/internal/gitaly/service/remote/fetch_internal_remote_test.go index 35ec8e25c7aef089f2c069eb519177ed7306ce66..f142b6deccf3584005610824cc3c4fa7d81dc9fc 100644 --- a/internal/gitaly/service/remote/fetch_internal_remote_test.go +++ b/internal/gitaly/service/remote/fetch_internal_remote_test.go @@ -150,7 +150,7 @@ func TestSuccessfulFetchInternalRemote(t *testing.T) { testhelper.ConfigureGitalyHooksBin(t, remoteCfg) - remoteAddr := testserver.RunGitalyServer(t, remoteCfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + remoteAddr := testserver.RunGitalyServer(t, remoteCfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer( deps.GetCfg(), deps.GetLocator(), @@ -180,7 +180,7 @@ func TestSuccessfulFetchInternalRemote(t *testing.T) { getGitalySSHInvocationParams := listenGitalySSHCalls(t, localCfg) hookManager := &mockHookManager{} - localAddr := testserver.RunGitalyServer(t, localCfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + localAddr := testserver.RunGitalyServer(t, localCfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/remote/testhelper_test.go b/internal/gitaly/service/remote/testhelper_test.go index e7d3d0262868bdc52150a87f285180963031fb15..8962ebb40743079b94ddde58f6adbc8d2a20c8a6 100644 --- a/internal/gitaly/service/remote/testhelper_test.go +++ b/internal/gitaly/service/remote/testhelper_test.go @@ -63,7 +63,7 @@ func setupRemoteServiceWithRuby(t *testing.T, cfg config.Cfg, rubySrv *rubyserve repo, repoPath, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name()) t.Cleanup(cleanup) - addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/remote/update_remote_mirror_test.go b/internal/gitaly/service/remote/update_remote_mirror_test.go index b29a8cb8095536a6c8c83fa39d59e58ee5f92564..17a19eb7069fcf9e00d4278f920e69856aac2074 100644 --- a/internal/gitaly/service/remote/update_remote_mirror_test.go +++ b/internal/gitaly/service/remote/update_remote_mirror_test.go @@ -494,7 +494,7 @@ func testUpdateRemoteMirrorFeatured(t *testing.T, ctx context.Context, cfg confi } } - addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { cmdFactory := deps.GetGitCmdFactory() if tc.wrapCommandFactory != nil { cmdFactory = tc.wrapCommandFactory(t, deps.GetGitCmdFactory()) @@ -567,7 +567,7 @@ func testSuccessfulUpdateRemoteMirrorRequest(t *testing.T, cfg config.Cfg, rubyS } func testSuccessfulUpdateRemoteMirrorRequestFeatured(t *testing.T, ctx context.Context, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -674,7 +674,7 @@ func testSuccessfulUpdateRemoteMirrorRequestWithWildcards(t *testing.T, cfg conf } func testSuccessfulUpdateRemoteMirrorRequestWithWildcardsFeatured(t *testing.T, ctx context.Context, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -757,7 +757,7 @@ func testSuccessfulUpdateRemoteMirrorRequestWithWildcardsFeatured(t *testing.T, } func testUpdateRemoteMirrorInmemory(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -830,7 +830,7 @@ func testSuccessfulUpdateRemoteMirrorRequestWithKeepDivergentRefs(t *testing.T, } func testSuccessfulUpdateRemoteMirrorRequestWithKeepDivergentRefsFeatured(t *testing.T, ctx context.Context, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -923,7 +923,7 @@ func testFailedUpdateRemoteMirrorRequestDueToValidation(t *testing.T, cfg config } func testFailedUpdateRemoteMirrorRequestDueToValidationFeatured(t *testing.T, ctx context.Context, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/apply_gitattributes_test.go b/internal/gitaly/service/repository/apply_gitattributes_test.go index a19ee3c2b29882686a2da407e2da780766c9d994..a3c093a9ae0c1d6bb32fff5b43f8062a00d1eb3e 100644 --- a/internal/gitaly/service/repository/apply_gitattributes_test.go +++ b/internal/gitaly/service/repository/apply_gitattributes_test.go @@ -90,7 +90,7 @@ func TestApplyGitattributesWithTransaction(t *testing.T) { cfg, repo, repoPath := testcfg.BuildWithRepo(t) transactionServer := &testTransactionServer{} - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/fetch_remote_test.go b/internal/gitaly/service/repository/fetch_remote_test.go index 8dc6751278df473c6dd60ea014337fc1933dbae9..34f3b84756f4e0e8b4ac86c6397356bee4ac0990 100644 --- a/internal/gitaly/service/repository/fetch_remote_test.go +++ b/internal/gitaly/service/repository/fetch_remote_test.go @@ -221,7 +221,7 @@ func TestFetchRemote_transaction(t *testing.T) { sourceCfg, _, sourceRepoPath := testcfg.BuildWithRepo(t) txManager := &mockTxManager{} - addr := testserver.RunGitalyServer(t, sourceCfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, sourceCfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/fork_test.go b/internal/gitaly/service/repository/fork_test.go index 8a387c6615aee790659b17423f2ec521b82e1840..76542080a37ab7a409b4f1869beef697220c3b50 100644 --- a/internal/gitaly/service/repository/fork_test.go +++ b/internal/gitaly/service/repository/fork_test.go @@ -28,6 +28,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ssh" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -217,7 +218,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s registry := backchannel.NewRegistry() locator := config.NewLocator(cfg) cache := cache.New(cfg, locator) - server, err := gserver.New(true, cfg, testhelper.DiscardTestEntry(t), registry, cache) + server, err := gserver.New(true, cfg, testhelper.DiscardTestEntry(t), registry, cache, streamrpc.NewServer()) require.NoError(t, err) listener, addr := testhelper.GetLocalhostListener(t) @@ -239,7 +240,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s // protected by the same TLS certificate. cfg.TLS.KeyPath = "" - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory())) }) diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index a892283936ab403d6e72bd4094802a5ef2500a76..a29096e3cf7f5339f5b436a55afd117049382337 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -392,7 +392,7 @@ func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) { } func runServerWithBadFetchInternalRemote(t *testing.T, cfg config.Cfg) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/testhelper_test.go b/internal/gitaly/service/repository/testhelper_test.go index 62be9ecc4a9ebcffabdf0e2c94dd20e072204607..4f85f87ce631e5bd134f650dc387def48ac3e2fb 100644 --- a/internal/gitaly/service/repository/testhelper_test.go +++ b/internal/gitaly/service/repository/testhelper_test.go @@ -119,7 +119,7 @@ func assertModTimeAfter(t *testing.T, afterTime time.Time, paths ...string) bool } func runRepositoryServerWithConfig(t testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server, opts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( cfg, deps.GetRubyServer(), diff --git a/internal/gitaly/service/server/info_test.go b/internal/gitaly/service/server/info_test.go index e253264cd5467e9441aa1258e8aa217b90ec0586..fa2102d34c784e06f71576c4e6eea454935ae89a 100644 --- a/internal/gitaly/service/server/info_test.go +++ b/internal/gitaly/service/server/info_test.go @@ -57,7 +57,7 @@ func TestGitalyServerInfo(t *testing.T) { } func runServer(t *testing.T, cfg config.Cfg, opts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterServerServiceServer(srv, NewServer(deps.GetGitCmdFactory(), deps.GetCfg().Storages)) }, opts...) } diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 31859d84306f59ac2a5a5a75d149bf2e79459d4d..827906f3a0a89aee920fcb0277cfa52fd48de406 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -21,6 +21,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/server" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/smarthttp" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ssh" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/teststream" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/wiki" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" @@ -51,8 +52,8 @@ var ( ) ) -// RegisterAll will register all the known gRPC services on the provided gRPC service instance. -func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { +// RegisterAll will register all the known gRPC + StreamRPC services +func RegisterAll(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterBlobServiceServer(srv, blob.NewServer( deps.GetCfg(), deps.GetLocator(), @@ -143,6 +144,13 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { gitalypb.RegisterInternalGitalyServer(srv, internalgitaly.NewServer(deps.GetCfg().Storages)) healthpb.RegisterHealthServer(srv, health.NewServer()) - reflection.Register(srv) - grpcprometheus.Register(srv) + + gitalypb.RegisterTestStreamServiceServer(srv, teststream.NewServer( + deps.GetLocator(), + )) + + if gs, ok := srv.(*grpc.Server); ok { + reflection.Register(gs) + grpcprometheus.Register(gs) + } } diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go index 1ca4e84e7994c0be1b444d71c24d4da0b7e1d918..c83650b1af540fcd823e6351b6a485893116e8a9 100644 --- a/internal/gitaly/service/smarthttp/receive_pack_test.go +++ b/internal/gitaly/service/smarthttp/receive_pack_test.go @@ -621,7 +621,7 @@ func TestPostReceiveWithReferenceTransactionHook(t *testing.T) { refTransactionServer := &testTransactionServer{} - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSmartHTTPServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go index b2639798539ffd1cba91234f8fce0f08ce6a3b93..2ee3d2ddc7fdeec3499cab9b710d162bf3bf8400 100644 --- a/internal/gitaly/service/smarthttp/testhelper_test.go +++ b/internal/gitaly/service/smarthttp/testhelper_test.go @@ -36,7 +36,7 @@ func testMain(m *testing.M) int { } func startSmartHTTPServer(t *testing.T, cfg config.Cfg, serverOpts ...ServerOpt) testserver.GitalyServer { - return testserver.StartGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.StartGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSmartHTTPServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/ssh/testhelper_test.go b/internal/gitaly/service/ssh/testhelper_test.go index d99c6b2abbabb8c9effc95f5ec7afced06604126..2695c7650366a0265c41d02e38cdf65862b20c56 100644 --- a/internal/gitaly/service/ssh/testhelper_test.go +++ b/internal/gitaly/service/ssh/testhelper_test.go @@ -31,7 +31,7 @@ func runSSHServer(t *testing.T, cfg config.Cfg, serverOpts ...testserver.GitalyS } func runSSHServerWithOptions(t *testing.T, cfg config.Cfg, opts []ServerOpt, serverOpts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSSHServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/teststream/server.go b/internal/gitaly/service/teststream/server.go new file mode 100644 index 0000000000000000000000000000000000000000..51256b2267928e0f34870e166e28caa603249a90 --- /dev/null +++ b/internal/gitaly/service/teststream/server.go @@ -0,0 +1,37 @@ +package teststream + +import ( + "context" + "io" + + "gitlab.com/gitlab-org/gitaly/v14/internal/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +type server struct { + gitalypb.UnimplementedTestStreamServiceServer + locator storage.Locator +} + +func (s *server) TestStream(ctx context.Context, request *gitalypb.TestStreamRequest) (*emptypb.Empty, error) { + if _, err := s.locator.GetRepoPath(request.Repository); err != nil { + return nil, err + } + + c, err := streamrpc.AcceptConnection(ctx) + if err != nil { + return nil, err + } + + _, err = io.CopyN(c, c, request.Size) + return nil, err +} + +// NewServer creates a new instance of a grpc ServerServiceServer +func NewServer(locator storage.Locator) gitalypb.TestStreamServiceServer { + return &server{ + locator: locator, + } +} diff --git a/internal/gitaly/service/teststream/server_test.go b/internal/gitaly/service/teststream/server_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8051cf78758237cadbc5cda4548c44588eb80313 --- /dev/null +++ b/internal/gitaly/service/teststream/server_test.go @@ -0,0 +1,115 @@ +package teststream + +import ( + "bytes" + "io" + "io/ioutil" + "math/rand" + "net" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" +) + +func TestTestStreamPingPong(t *testing.T) { + const size = 1024 * 1024 + + addr, repo := runGitalyServer(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + in := make([]byte, size) + _, err := rand.Read(in) + require.NoError(t, err) + + var out []byte + require.NotEqual(t, in, out) + require.NoError(t, streamrpc.Call( + ctx, + streamrpc.DialNet(addr), + "/gitaly.TestStreamService/TestStream", + &gitalypb.TestStreamRequest{ + Repository: repo, + Size: size, + }, + func(c net.Conn) error { + errC := make(chan error, 1) + go func() { + var err error + out, err = ioutil.ReadAll(c) + errC <- err + }() + + if _, err := io.Copy(c, bytes.NewReader(in)); err != nil { + return err + } + if err := <-errC; err != nil { + return err + } + + return nil + }, + )) + + require.Equal(t, in, out, "byte stream works") +} + +func TestTestStreamPingPongWithInvalidRepo(t *testing.T) { + addr, repo := runGitalyServer(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + err := streamrpc.Call( + ctx, + streamrpc.DialNet(addr), + "/gitaly.TestStreamService/TestStream", + &gitalypb.TestStreamRequest{ + Repository: &gitalypb.Repository{ + StorageName: repo.StorageName, + RelativePath: "@hashed/94/00/notexist.git", + GlRepository: repo.GlRepository, + GlProjectPath: repo.GlProjectPath, + }, + Size: 1024 * 1024, + }, + func(c net.Conn) error { + panic("Should not reach here") + }, + ) + + require.Error(t, err) + require.Contains( + t, err.Error(), + "rpc error: code = NotFound desc = GetRepoPath: not a git repository", + ) +} + +func runGitalyServer(t *testing.T) (string, *gitalypb.Repository) { + t.Helper() + testhelper.Configure() + + cfg, repo, _ := testcfg.BuildWithRepo(t) + + addr := testserver.RunGitalyServer( + t, cfg, nil, + func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { + gitalypb.RegisterTestStreamServiceServer(srv, NewServer(deps.GetLocator())) + }, + // TODO: At the moment, stream RPC doesn't work well with Praefect, + // hence we have to disable Praefect. We can remove this option after + // https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1127 is + // done + testserver.WithDisablePraefect(), + ) + + return addr, repo +} diff --git a/internal/gitaly/service/wiki/testhelper_test.go b/internal/gitaly/service/wiki/testhelper_test.go index 85d2d1cc40c8c8b6c6f82d35fc0f416412996ee7..61ccfd3d1b8659f62a400323eeffbd2ba7978bdb 100644 --- a/internal/gitaly/service/wiki/testhelper_test.go +++ b/internal/gitaly/service/wiki/testhelper_test.go @@ -85,7 +85,7 @@ func TestWithRubySidecar(t *testing.T) { } func setupWikiService(t testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server) gitalypb.WikiServiceClient { - addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterWikiServiceServer(srv, NewServer(deps.GetRubyServer(), deps.GetLocator())) }) client := newWikiClient(t, addr) diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go index 028df3d20afce272054bbf6b273ed652f58d6ca4..6878a070838411e5e62775f2ae65f0c929909a31 100644 --- a/internal/gitaly/transaction/manager_test.go +++ b/internal/gitaly/transaction/manager_test.go @@ -191,7 +191,7 @@ func TestPoolManager_Stop(t *testing.T) { func runTransactionServer(t *testing.T, cfg config.Cfg) (*testTransactionServer, string) { transactionServer := &testTransactionServer{} cfg.ListenAddr = ":0" // pushes gRPC to listen on the TCP address - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRefTransactionServer(srv, transactionServer) }, testserver.WithDisablePraefect()) return transactionServer, addr diff --git a/internal/metadata/featureflag/feature_flags.go b/internal/metadata/featureflag/feature_flags.go index 8b9175f667847e209bde12170a535357a7ada2c1..55e0eb7206e775466c0de74b32e1ccd54ae31b44 100644 --- a/internal/metadata/featureflag/feature_flags.go +++ b/internal/metadata/featureflag/feature_flags.go @@ -24,6 +24,8 @@ var ( FindAllTagsPipeline = FeatureFlag{Name: "find_all_tags_pipeline", OnByDefault: false} // TxRemoveRepository enables transactionsal voting for the RemoveRepository RPC. TxRemoveRepository = FeatureFlag{Name: "tx_remove_repository", OnByDefault: false} + // GitalyHooksPackObjectsHookStream enables StreamRPC in 'gitaly-hooks git pack-objects'. + PackObjectsHookStream = FeatureFlag{Name: "pack_objects_hook_stream", OnByDefault: false} ) // All includes all feature flags. @@ -36,4 +38,5 @@ var All = []FeatureFlag{ ReplicateRepositoryDirectFetch, FindAllTagsPipeline, TxRemoveRepository, + PackObjectsHookStream, } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index d7dcdd369e2704f91b8d1602661891433a3f2f20..914d32565a602a89a58c2c20dd0a3d4d62ae0188 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1451,7 +1451,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { wg: &wg, } - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, operationServer) }, testserver.WithDiskCache(&mockDiskCache{})) diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index 4fd1a69fbbb7ebb2a3494989a4e99d51632293f6..8e00abd6fd03ef347885fa0a24091079d1bfc7d9 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -31,7 +31,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { testRepo = repo } cfgs = append(cfgs, cfg) - cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv *grpc.Server, deps *service.Dependencies) { + cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b5675f8022da0280e8579920e381cc82aed1c8e2..a4fc45e8530594d1830a4f2b620611d003a4d6f4 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -570,7 +570,7 @@ func (m *mockServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) func runMockRepositoryServer(t *testing.T, cfg gconfig.Cfg) (*mockServer, string) { mockServer := newMockRepositoryServer() - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, mockServer) gitalypb.RegisterRefServiceServer(srv, mockServer) }) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 71812100e74e7da43bdd64a5f8366993b9a8d133..22da58a6b8ee37b602aac2c1d79a417d49adfffd 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -724,7 +724,7 @@ func (m *mockSmartHTTP) Called(method string) int { } func newSmartHTTPGrpcServer(t *testing.T, cfg gconfig.Cfg, smartHTTPService gitalypb.SmartHTTPServiceServer) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSmartHTTPServiceServer(srv, smartHTTPService) }, testserver.WithDisablePraefect()) } diff --git a/internal/streamrpc/handshaker.go b/internal/streamrpc/handshaker.go new file mode 100644 index 0000000000000000000000000000000000000000..54548af86d8efac453732bee0f01e14959309796 --- /dev/null +++ b/internal/streamrpc/handshaker.go @@ -0,0 +1,137 @@ +package streamrpc + +import ( + "crypto/tls" + "fmt" + "net" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter" + "google.golang.org/grpc/credentials" +) + +// The magic bytes used for classification by listenmux +var magicBytes = []byte("streamrpc00") + +// DialNet lets Call initiate unencrypted connections. They tend to be used +// with Gitaly's listenmux multiplexer only. After the connection is +// established, streamrpc's 11-byte magic bytes are written into the wire. +// Listemmux peeks into these magic bytes and redirects the request to +// StreamRPC server. +// Please visit internal/listenmux/mux.go for more information +func DialNet(address string) DialFunc { + return func(t time.Duration) (net.Conn, error) { + endpoint, err := starter.ParseEndpoint(address) + if err != nil { + return nil, err + } + + // Dial-only deadline + deadline := time.Now().Add(t) + + dialer := &net.Dialer{Deadline: deadline} + conn, err := dialer.Dial(endpoint.Name, endpoint.Addr) + if err != nil { + return nil, err + } + + if err = conn.SetDeadline(deadline); err != nil { + return nil, err + } + // Write the magic bytes on the connection so the server knows we're + // about to initiate a multiplexing session. + if _, err := conn.Write(magicBytes); err != nil { + return nil, fmt.Errorf("streamrpc client: write backchannel magic bytes: %w", err) + } + + // Reset deadline of tls connection for later stages + if err = conn.SetDeadline(time.Time{}); err != nil { + return nil, err + } + + return conn, nil + } +} + +// DialTLS lets Call initiate TLS connections. Similar to DialNet, the +// connections are used for listenmux multiplexer. There are 3 steps involving: +// - TCP handshake +// - TLS handshake +// - Write streamrpc magic bytes +func DialTLS(address string, cfg *tls.Config) DialFunc { + return func(t time.Duration) (net.Conn, error) { + // Dial-only deadline + deadline := time.Now().Add(t) + + dialer := &net.Dialer{Deadline: deadline} + tlsConn, err := tls.DialWithDialer(dialer, "tcp", address, cfg) + if err != nil { + return nil, err + } + + err = tlsConn.SetDeadline(deadline) + if err != nil { + return nil, err + } + // Write the magic bytes on the connection so the server knows we're + // about to initiate a multiplexing session. + if _, err := tlsConn.Write(magicBytes); err != nil { + return nil, fmt.Errorf("streamrpc client: write backchannel magic bytes: %w", err) + } + + // Reset deadline of tls connection for later stages + if err = tlsConn.SetDeadline(time.Time{}); err != nil { + return nil, err + } + + return tlsConn, nil + } +} + +// ServerHandshaker implements the server side handshake of the multiplexed connection. +type ServerHandshaker struct { + server *Server + logger logrus.FieldLogger +} + +// NewServerHandshaker returns an implementation of streamrpc server +// handshaker. The provided TransportCredentials are handshaked prior to +// initializing the multiplexing session. This handshaker Gitaly's unary server +// interceptors into the interceptor chain of input StreamRPC server. +func NewServerHandshaker(server *Server, logger logrus.FieldLogger) *ServerHandshaker { + return &ServerHandshaker{ + server: server, + logger: logger, + } +} + +// Magic is used by listenmux to retrieve the magic string for +// streamrpc connections. +func (s *ServerHandshaker) Magic() string { return string(magicBytes) } + +// Handshake "steals" the request from Gitaly's main gRPC server during +// connection handshaking phase. Listenmux depends on the first 11-byte magic +// bytes sent by the client, and invoke StreamRPC handshaker accordingly. The +// request is then handled by stream RPC server, and skipped by Gitaly gRPC +// server. +func (s *ServerHandshaker) Handshake(conn net.Conn, authInfo credentials.AuthInfo) (net.Conn, credentials.AuthInfo, error) { + if err := conn.SetDeadline(time.Time{}); err != nil { + return nil, nil, err + } + + go func() { + if err := s.server.Handle(conn); err != nil { + s.logger.WithError(err).Error("streamrpc: handle call") + } + }() + // At this point, the connection is already closed. If the + // TransportCredentials continues its code path, gRPC constructs a HTTP2 + // server transport to handle the connection. Eventually, it fails and logs + // several warnings and errors even though the stream RPC call is + // successful. + // Fortunately, gRPC has credentials.ErrConnDispatched, indicating that the + // connection is already dispatched out of gRPC. gRPC should leave it alone + // and exit in peace. + return nil, nil, credentials.ErrConnDispatched +} diff --git a/internal/streamrpc/rpc_test.go b/internal/streamrpc/rpc_test.go index 6692227808ee7b66e6488cbe56d04988d63b0095..3fe5ef529cd14d1b115a375ed8642b6673b3a2b4 100644 --- a/internal/streamrpc/rpc_test.go +++ b/internal/streamrpc/rpc_test.go @@ -188,18 +188,20 @@ func TestCall_serverMiddleware(t *testing.T) { ) interceptorDone := make(chan struct{}) + server := NewServer() + server.UseInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + defer close(interceptorDone) + middlewareMethod = info.FullMethod + receivedField = req.(*testpb.StreamRequest).StringField + if md, ok := metadata.FromIncomingContext(ctx); ok { + receivedValues = md[testKey] + } + return handler(ctx, req) + }) dial := startServer( t, - NewServer(WithServerInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - defer close(interceptorDone) - middlewareMethod = info.FullMethod - receivedField = req.(*testpb.StreamRequest).StringField - if md, ok := metadata.FromIncomingContext(ctx); ok { - receivedValues = md[testKey] - } - return handler(ctx, req) - })), + server, func(ctx context.Context, in *testpb.StreamRequest) (*emptypb.Empty, error) { _, err := AcceptConnection(ctx) return nil, err @@ -222,15 +224,11 @@ func TestCall_serverMiddleware(t *testing.T) { } func TestCall_serverMiddlewareReject(t *testing.T) { - dial := startServer( - t, - NewServer(WithServerInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - return nil, errors.New("middleware says no") - })), - func(ctx context.Context, in *testpb.StreamRequest) (*emptypb.Empty, error) { - panic("never reached") - }, - ) + server := NewServer() + server.UseInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return nil, errors.New("middleware says no") + }) + dial := startServer(t, server, func(ctx context.Context, in *testpb.StreamRequest) (*emptypb.Empty, error) { panic("never reached") }) err := Call( context.Background(), diff --git a/internal/streamrpc/server.go b/internal/streamrpc/server.go index 502764fd47ff5a36add567356906d7b024b1bc38..2c4771af5fb20768a262a37befd9bdefcd6e6682 100644 --- a/internal/streamrpc/server.go +++ b/internal/streamrpc/server.go @@ -30,11 +30,6 @@ type method struct { // options to NewServer. type ServerOption func(*Server) -// WithServerInterceptor adds a unary gRPC server interceptor. -func WithServerInterceptor(interceptor grpc.UnaryServerInterceptor) ServerOption { - return func(s *Server) { s.interceptor = interceptor } -} - // NewServer returns a new StreamRPC server. You can pass the result to // grpc-go RegisterFooServer functions. func NewServer(opts ...ServerOption) *Server { @@ -60,6 +55,12 @@ func (s *Server) RegisterService(sd *grpc.ServiceDesc, impl interface{}) { } } +// UseInterceptor adds a unary gRPC server interceptor for the StreamRPC +// server to use. +func (s *Server) UseInterceptor(interceptor grpc.UnaryServerInterceptor) { + s.interceptor = interceptor +} + // Handle handles an incoming network connection with the StreamRPC // protocol. It is intended to be called from a net.Listener.Accept loop // (or something equivalent). diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index df54fa39c5d6b1c14506bcd673e6cd15c1ad054c..399aed5371ae96c1f85720a50dd89e7906617d9c 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -41,7 +41,7 @@ import ( // It accepts addition Registrar to register all required service instead of // calling service.RegisterAll explicitly because it creates a circular dependency // when the function is used in on of internal/gitaly/service/... packages. -func RunGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) string { +func RunGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv grpc.ServiceRegistrar, deps *service.Dependencies), opts ...GitalyServerOpt) string { _, gitalyAddr, disablePraefect := runGitaly(t, cfg, rubyServer, registrar, opts...) praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN") @@ -135,7 +135,7 @@ func (gs GitalyServer) Address() string { } // StartGitalyServer creates and runs gitaly (and praefect as a proxy) server. -func StartGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) GitalyServer { +func StartGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv grpc.ServiceRegistrar, deps *service.Dependencies), opts ...GitalyServerOpt) GitalyServer { gitalySrv, gitalyAddr, disablePraefect := runGitaly(t, cfg, rubyServer, registrar, opts...) praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN") @@ -199,7 +199,7 @@ func IsHealthy(conn *grpc.ClientConn, timeout time.Duration) bool { return true } -func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) { +func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv grpc.ServiceRegistrar, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) { t.Helper() var gsd gitalyServerDeps @@ -210,20 +210,21 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi deps := gsd.createDependencies(t, cfg, rubyServer) t.Cleanup(func() { gsd.conns.Close() }) - srv, err := server.NewGitalyServerFactory( + grpcSrv, srpcSrv, err := server.NewGitalyServerFactory( cfg, gsd.logger.WithField("test", t.Name()), deps.GetBackchannelRegistry(), deps.GetDiskCache(), ).CreateExternal(cfg.TLS.CertPath != "" && cfg.TLS.KeyPath != "") require.NoError(t, err) - t.Cleanup(srv.Stop) + t.Cleanup(grpcSrv.Stop) - registrar(srv, deps) - if _, found := srv.GetServiceInfo()["grpc.health.v1.Health"]; !found { + registrar(grpcSrv, deps) + registrar(srpcSrv, deps) + if _, found := grpcSrv.GetServiceInfo()["grpc.health.v1.Health"]; !found { // we should register health service as it is used for the health checks // praefect service executes periodically (and on the bootstrap step) - healthpb.RegisterHealthServer(srv, health.NewServer()) + healthpb.RegisterHealthServer(grpcSrv, health.NewServer()) } // listen on internal socket @@ -243,7 +244,7 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi internalListener, err := net.Listen("unix", cfg.GitalyInternalSocketPath()) require.NoError(t, err) - go srv.Serve(internalListener) + go grpcSrv.Serve(internalListener) } var listener net.Listener @@ -266,9 +267,9 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi addr = "unix://" + serverSocketPath } - go srv.Serve(listener) + go grpcSrv.Serve(listener) - return srv, addr, gsd.disablePraefect + return grpcSrv, addr, gsd.disablePraefect } type gitalyServerDeps struct { diff --git a/proto/go/gitalypb/hook.pb.go b/proto/go/gitalypb/hook.pb.go index a25bf0254845095f7900ce447b511762bc42bb27..080765af7ceaf9c2983c1dcfa90f4801bd234364 100644 --- a/proto/go/gitalypb/hook.pb.go +++ b/proto/go/gitalypb/hook.pb.go @@ -9,6 +9,7 @@ package gitalypb import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" reflect "reflect" sync "sync" ) @@ -735,149 +736,219 @@ func (x *PackObjectsHookResponse) GetStderr() []byte { return nil } +type PackObjectsHookStreamRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"` + // args contains the arguments passed to the pack-objects hook, without the leading "git" + Args []string `protobuf:"bytes,2,rep,name=args,proto3" json:"args,omitempty"` +} + +func (x *PackObjectsHookStreamRequest) Reset() { + *x = PackObjectsHookStreamRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_hook_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PackObjectsHookStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PackObjectsHookStreamRequest) ProtoMessage() {} + +func (x *PackObjectsHookStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_hook_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PackObjectsHookStreamRequest.ProtoReflect.Descriptor instead. +func (*PackObjectsHookStreamRequest) Descriptor() ([]byte, []int) { + return file_hook_proto_rawDescGZIP(), []int{10} +} + +func (x *PackObjectsHookStreamRequest) GetRepository() *Repository { + if x != nil { + return x.Repository + } + return nil +} + +func (x *PackObjectsHookStreamRequest) GetArgs() []string { + if x != nil { + return x.Args + } + return nil +} + var File_hook_proto protoreflect.FileDescriptor var file_hook_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x68, 0x6f, 0x6f, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x1a, 0x0a, 0x6c, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc6, - 0x01, 0x0a, 0x15, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, - 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, - 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, - 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, - 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, - 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x28, 0x0a, - 0x10, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x75, 0x73, 0x68, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x67, 0x69, 0x74, 0x50, 0x75, 0x73, 0x68, - 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x7d, 0x0a, 0x16, 0x50, 0x72, 0x65, 0x52, 0x65, - 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, - 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, - 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0xc7, 0x01, 0x0a, 0x16, 0x50, 0x6f, 0x73, 0x74, 0x52, - 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, - 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, - 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, - 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, - 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, - 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, - 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x75, - 0x73, 0x68, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, - 0x52, 0x0e, 0x67, 0x69, 0x74, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x22, 0x7e, 0x0a, 0x17, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, - 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, - 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, - 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, - 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x22, 0xce, 0x01, 0x0a, 0x11, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, - 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, - 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, - 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, - 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x72, 0x65, 0x66, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x03, 0x72, 0x65, 0x66, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x6c, 0x64, 0x5f, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x6c, 0x64, 0x56, - 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x56, 0x61, 0x6c, 0x75, - 0x65, 0x22, 0x79, 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, - 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, - 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, - 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x9e, 0x02, 0x0a, - 0x1f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, - 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, - 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, - 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, - 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, - 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, - 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, - 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x43, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, + 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, + 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc6, 0x01, 0x0a, 0x15, + 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, + 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, + 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, + 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, + 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, + 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, + 0x62, 0x6c, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x67, 0x69, + 0x74, 0x5f, 0x70, 0x75, 0x73, 0x68, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x05, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x67, 0x69, 0x74, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x7d, 0x0a, 0x16, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, + 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, + 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, + 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, + 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, + 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x22, 0xc7, 0x01, 0x0a, 0x16, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, + 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, + 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, + 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, + 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, + 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x14, 0x0a, + 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, + 0x64, 0x69, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x75, 0x73, 0x68, 0x5f, + 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x67, + 0x69, 0x74, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x7e, 0x0a, + 0x17, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, + 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, + 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, + 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0xce, 0x01, + 0x0a, 0x11, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, + 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, + 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, + 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, + 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, + 0x65, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x72, 0x65, 0x66, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x03, 0x72, 0x65, 0x66, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x6c, 0x64, 0x5f, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x6c, 0x64, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x79, + 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, + 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, + 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, + 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x9e, 0x02, 0x0a, 0x1f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x31, 0x0a, 0x05, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x52, 0x45, 0x50, 0x41, 0x52, 0x45, 0x44, 0x10, - 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x54, 0x45, 0x44, 0x10, 0x01, - 0x12, 0x0b, 0x0a, 0x07, 0x41, 0x42, 0x4f, 0x52, 0x54, 0x45, 0x44, 0x10, 0x02, 0x22, 0x87, 0x01, - 0x0a, 0x20, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, - 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, - 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, - 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x7c, 0x0a, 0x16, 0x50, 0x61, 0x63, 0x6b, 0x4f, - 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, - 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, - 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x61, - 0x72, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, - 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, - 0x73, 0x74, 0x64, 0x69, 0x6e, 0x22, 0x49, 0x0a, 0x17, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, - 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, - 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, - 0x32, 0xf4, 0x03, 0x0a, 0x0b, 0x48, 0x6f, 0x6f, 0x6b, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x5b, 0x0a, 0x0e, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, - 0x6f, 0x6b, 0x12, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x72, 0x65, 0x52, - 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x72, 0x65, 0x52, 0x65, - 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5e, 0x0a, - 0x0f, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, - 0x12, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, - 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, - 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4d, 0x0a, - 0x0a, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x19, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x30, 0x01, 0x12, 0x79, 0x0a, 0x18, - 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x27, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x28, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, + 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, + 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, + 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, + 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, + 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, + 0x69, 0x6e, 0x12, 0x43, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x2d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, - 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, - 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5e, 0x0a, 0x0f, 0x50, 0x61, 0x63, 0x6b, 0x4f, - 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1e, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, - 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, - 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, - 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, - 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x31, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x52, 0x45, 0x50, 0x41, 0x52, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, + 0x0a, 0x09, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, + 0x07, 0x41, 0x42, 0x4f, 0x52, 0x54, 0x45, 0x44, 0x10, 0x02, 0x22, 0x87, 0x01, 0x0a, 0x20, 0x52, + 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, + 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, + 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, + 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x22, 0x7c, 0x0a, 0x16, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, + 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, + 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, + 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, 0x14, 0x0a, 0x05, + 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, + 0x69, 0x6e, 0x22, 0x49, 0x0a, 0x17, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, + 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, + 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, + 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x22, 0x6c, 0x0a, + 0x1c, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, + 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x32, 0xd3, 0x04, 0x0a, 0x0b, + 0x48, 0x6f, 0x6f, 0x6b, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5b, 0x0a, 0x0e, 0x50, + 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1d, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, + 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, + 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5e, 0x0a, 0x0f, 0x50, 0x6f, 0x73, 0x74, + 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1e, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, + 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4d, 0x0a, 0x0a, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, + 0x97, 0x28, 0x02, 0x08, 0x02, 0x30, 0x01, 0x12, 0x79, 0x0a, 0x18, 0x52, 0x65, 0x66, 0x65, 0x72, + 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, + 0x6f, 0x6f, 0x6b, 0x12, 0x27, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x66, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, + 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, + 0x30, 0x01, 0x12, 0x5e, 0x0a, 0x0f, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, + 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, + 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, + 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, + 0x30, 0x01, 0x12, 0x5d, 0x0a, 0x15, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, + 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x24, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, + 0x48, 0x6f, 0x6f, 0x6b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, + 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -893,7 +964,7 @@ func file_hook_proto_rawDescGZIP() []byte { } var file_hook_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_hook_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_hook_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_hook_proto_goTypes = []interface{}{ (ReferenceTransactionHookRequest_State)(0), // 0: gitaly.ReferenceTransactionHookRequest.State (*PreReceiveHookRequest)(nil), // 1: gitaly.PreReceiveHookRequest @@ -906,35 +977,40 @@ var file_hook_proto_goTypes = []interface{}{ (*ReferenceTransactionHookResponse)(nil), // 8: gitaly.ReferenceTransactionHookResponse (*PackObjectsHookRequest)(nil), // 9: gitaly.PackObjectsHookRequest (*PackObjectsHookResponse)(nil), // 10: gitaly.PackObjectsHookResponse - (*Repository)(nil), // 11: gitaly.Repository - (*ExitStatus)(nil), // 12: gitaly.ExitStatus + (*PackObjectsHookStreamRequest)(nil), // 11: gitaly.PackObjectsHookStreamRequest + (*Repository)(nil), // 12: gitaly.Repository + (*ExitStatus)(nil), // 13: gitaly.ExitStatus + (*emptypb.Empty)(nil), // 14: google.protobuf.Empty } var file_hook_proto_depIdxs = []int32{ - 11, // 0: gitaly.PreReceiveHookRequest.repository:type_name -> gitaly.Repository - 12, // 1: gitaly.PreReceiveHookResponse.exit_status:type_name -> gitaly.ExitStatus - 11, // 2: gitaly.PostReceiveHookRequest.repository:type_name -> gitaly.Repository - 12, // 3: gitaly.PostReceiveHookResponse.exit_status:type_name -> gitaly.ExitStatus - 11, // 4: gitaly.UpdateHookRequest.repository:type_name -> gitaly.Repository - 12, // 5: gitaly.UpdateHookResponse.exit_status:type_name -> gitaly.ExitStatus - 11, // 6: gitaly.ReferenceTransactionHookRequest.repository:type_name -> gitaly.Repository + 12, // 0: gitaly.PreReceiveHookRequest.repository:type_name -> gitaly.Repository + 13, // 1: gitaly.PreReceiveHookResponse.exit_status:type_name -> gitaly.ExitStatus + 12, // 2: gitaly.PostReceiveHookRequest.repository:type_name -> gitaly.Repository + 13, // 3: gitaly.PostReceiveHookResponse.exit_status:type_name -> gitaly.ExitStatus + 12, // 4: gitaly.UpdateHookRequest.repository:type_name -> gitaly.Repository + 13, // 5: gitaly.UpdateHookResponse.exit_status:type_name -> gitaly.ExitStatus + 12, // 6: gitaly.ReferenceTransactionHookRequest.repository:type_name -> gitaly.Repository 0, // 7: gitaly.ReferenceTransactionHookRequest.state:type_name -> gitaly.ReferenceTransactionHookRequest.State - 12, // 8: gitaly.ReferenceTransactionHookResponse.exit_status:type_name -> gitaly.ExitStatus - 11, // 9: gitaly.PackObjectsHookRequest.repository:type_name -> gitaly.Repository - 1, // 10: gitaly.HookService.PreReceiveHook:input_type -> gitaly.PreReceiveHookRequest - 3, // 11: gitaly.HookService.PostReceiveHook:input_type -> gitaly.PostReceiveHookRequest - 5, // 12: gitaly.HookService.UpdateHook:input_type -> gitaly.UpdateHookRequest - 7, // 13: gitaly.HookService.ReferenceTransactionHook:input_type -> gitaly.ReferenceTransactionHookRequest - 9, // 14: gitaly.HookService.PackObjectsHook:input_type -> gitaly.PackObjectsHookRequest - 2, // 15: gitaly.HookService.PreReceiveHook:output_type -> gitaly.PreReceiveHookResponse - 4, // 16: gitaly.HookService.PostReceiveHook:output_type -> gitaly.PostReceiveHookResponse - 6, // 17: gitaly.HookService.UpdateHook:output_type -> gitaly.UpdateHookResponse - 8, // 18: gitaly.HookService.ReferenceTransactionHook:output_type -> gitaly.ReferenceTransactionHookResponse - 10, // 19: gitaly.HookService.PackObjectsHook:output_type -> gitaly.PackObjectsHookResponse - 15, // [15:20] is the sub-list for method output_type - 10, // [10:15] is the sub-list for method input_type - 10, // [10:10] is the sub-list for extension type_name - 10, // [10:10] is the sub-list for extension extendee - 0, // [0:10] is the sub-list for field type_name + 13, // 8: gitaly.ReferenceTransactionHookResponse.exit_status:type_name -> gitaly.ExitStatus + 12, // 9: gitaly.PackObjectsHookRequest.repository:type_name -> gitaly.Repository + 12, // 10: gitaly.PackObjectsHookStreamRequest.repository:type_name -> gitaly.Repository + 1, // 11: gitaly.HookService.PreReceiveHook:input_type -> gitaly.PreReceiveHookRequest + 3, // 12: gitaly.HookService.PostReceiveHook:input_type -> gitaly.PostReceiveHookRequest + 5, // 13: gitaly.HookService.UpdateHook:input_type -> gitaly.UpdateHookRequest + 7, // 14: gitaly.HookService.ReferenceTransactionHook:input_type -> gitaly.ReferenceTransactionHookRequest + 9, // 15: gitaly.HookService.PackObjectsHook:input_type -> gitaly.PackObjectsHookRequest + 11, // 16: gitaly.HookService.PackObjectsHookStream:input_type -> gitaly.PackObjectsHookStreamRequest + 2, // 17: gitaly.HookService.PreReceiveHook:output_type -> gitaly.PreReceiveHookResponse + 4, // 18: gitaly.HookService.PostReceiveHook:output_type -> gitaly.PostReceiveHookResponse + 6, // 19: gitaly.HookService.UpdateHook:output_type -> gitaly.UpdateHookResponse + 8, // 20: gitaly.HookService.ReferenceTransactionHook:output_type -> gitaly.ReferenceTransactionHookResponse + 10, // 21: gitaly.HookService.PackObjectsHook:output_type -> gitaly.PackObjectsHookResponse + 14, // 22: gitaly.HookService.PackObjectsHookStream:output_type -> google.protobuf.Empty + 17, // [17:23] is the sub-list for method output_type + 11, // [11:17] is the sub-list for method input_type + 11, // [11:11] is the sub-list for extension type_name + 11, // [11:11] is the sub-list for extension extendee + 0, // [0:11] is the sub-list for field type_name } func init() { file_hook_proto_init() } @@ -1065,6 +1141,18 @@ func file_hook_proto_init() { return nil } } + file_hook_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PackObjectsHookStreamRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1072,7 +1160,7 @@ func file_hook_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_hook_proto_rawDesc, NumEnums: 1, - NumMessages: 10, + NumMessages: 11, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/go/gitalypb/hook_grpc.pb.go b/proto/go/gitalypb/hook_grpc.pb.go index 7efe8249f25defb6add419aabea325557c9652cc..a494223ecd26a656f22feb100a8a5fc9213caf84 100644 --- a/proto/go/gitalypb/hook_grpc.pb.go +++ b/proto/go/gitalypb/hook_grpc.pb.go @@ -7,6 +7,7 @@ import ( grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" ) // This is a compile-time assertion to ensure that this generated file @@ -26,6 +27,7 @@ type HookServiceClient interface { // uploadpack.packObjectsHook mechanism. It generates a stream of packed // Git objects. PackObjectsHook(ctx context.Context, opts ...grpc.CallOption) (HookService_PackObjectsHookClient, error) + PackObjectsHookStream(ctx context.Context, in *PackObjectsHookStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type hookServiceClient struct { @@ -192,6 +194,15 @@ func (x *hookServicePackObjectsHookClient) Recv() (*PackObjectsHookResponse, err return m, nil } +func (c *hookServiceClient) PackObjectsHookStream(ctx context.Context, in *PackObjectsHookStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/gitaly.HookService/PackObjectsHookStream", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // HookServiceServer is the server API for HookService service. // All implementations must embed UnimplementedHookServiceServer // for forward compatibility @@ -204,6 +215,7 @@ type HookServiceServer interface { // uploadpack.packObjectsHook mechanism. It generates a stream of packed // Git objects. PackObjectsHook(HookService_PackObjectsHookServer) error + PackObjectsHookStream(context.Context, *PackObjectsHookStreamRequest) (*emptypb.Empty, error) mustEmbedUnimplementedHookServiceServer() } @@ -226,6 +238,9 @@ func (UnimplementedHookServiceServer) ReferenceTransactionHook(HookService_Refer func (UnimplementedHookServiceServer) PackObjectsHook(HookService_PackObjectsHookServer) error { return status.Errorf(codes.Unimplemented, "method PackObjectsHook not implemented") } +func (UnimplementedHookServiceServer) PackObjectsHookStream(context.Context, *PackObjectsHookStreamRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method PackObjectsHookStream not implemented") +} func (UnimplementedHookServiceServer) mustEmbedUnimplementedHookServiceServer() {} // UnsafeHookServiceServer may be embedded to opt out of forward compatibility for this service. @@ -364,13 +379,36 @@ func (x *hookServicePackObjectsHookServer) Recv() (*PackObjectsHookRequest, erro return m, nil } +func _HookService_PackObjectsHookStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PackObjectsHookStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HookServiceServer).PackObjectsHookStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gitaly.HookService/PackObjectsHookStream", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HookServiceServer).PackObjectsHookStream(ctx, req.(*PackObjectsHookStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + // HookService_ServiceDesc is the grpc.ServiceDesc for HookService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var HookService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "gitaly.HookService", HandlerType: (*HookServiceServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "PackObjectsHookStream", + Handler: _HookService_PackObjectsHookStream_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "PreReceiveHook", diff --git a/proto/go/gitalypb/protolist.go b/proto/go/gitalypb/protolist.go index a15916f7044c5f2a915ae137ecd7dbeec0dfc489..9d26e24a784506a9b84de3bcb03c1f7bffe9c541 100644 --- a/proto/go/gitalypb/protolist.go +++ b/proto/go/gitalypb/protolist.go @@ -23,6 +23,7 @@ var GitalyProtos = []string{ "shared.proto", "smarthttp.proto", "ssh.proto", + "teststream.proto", "transaction.proto", "wiki.proto", } diff --git a/proto/go/gitalypb/teststream.pb.go b/proto/go/gitalypb/teststream.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..d6a68abce3e6c3e0e313adc78b32324b9ca43423 --- /dev/null +++ b/proto/go/gitalypb/teststream.pb.go @@ -0,0 +1,173 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.17.3 +// source: teststream.proto + +package gitalypb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type TestStreamRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"` + Size int64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"` +} + +func (x *TestStreamRequest) Reset() { + *x = TestStreamRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_teststream_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestStreamRequest) ProtoMessage() {} + +func (x *TestStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_teststream_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestStreamRequest.ProtoReflect.Descriptor instead. +func (*TestStreamRequest) Descriptor() ([]byte, []int) { + return file_teststream_proto_rawDescGZIP(), []int{0} +} + +func (x *TestStreamRequest) GetRepository() *Repository { + if x != nil { + return x.Repository + } + return nil +} + +func (x *TestStreamRequest) GetSize() int64 { + if x != nil { + return x.Size + } + return 0 +} + +var File_teststream_proto protoreflect.FileDescriptor + +var file_teststream_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x74, 0x65, 0x73, 0x74, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x1a, 0x0a, 0x6c, 0x69, 0x6e, 0x74, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0x61, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, + 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, + 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, + 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, + 0x73, 0x69, 0x7a, 0x65, 0x32, 0x5c, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x47, 0x0a, 0x0a, 0x54, 0x65, 0x73, + 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, + 0x08, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_teststream_proto_rawDescOnce sync.Once + file_teststream_proto_rawDescData = file_teststream_proto_rawDesc +) + +func file_teststream_proto_rawDescGZIP() []byte { + file_teststream_proto_rawDescOnce.Do(func() { + file_teststream_proto_rawDescData = protoimpl.X.CompressGZIP(file_teststream_proto_rawDescData) + }) + return file_teststream_proto_rawDescData +} + +var file_teststream_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_teststream_proto_goTypes = []interface{}{ + (*TestStreamRequest)(nil), // 0: gitaly.TestStreamRequest + (*Repository)(nil), // 1: gitaly.Repository + (*emptypb.Empty)(nil), // 2: google.protobuf.Empty +} +var file_teststream_proto_depIdxs = []int32{ + 1, // 0: gitaly.TestStreamRequest.repository:type_name -> gitaly.Repository + 0, // 1: gitaly.TestStreamService.TestStream:input_type -> gitaly.TestStreamRequest + 2, // 2: gitaly.TestStreamService.TestStream:output_type -> google.protobuf.Empty + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_teststream_proto_init() } +func file_teststream_proto_init() { + if File_teststream_proto != nil { + return + } + file_lint_proto_init() + file_shared_proto_init() + if !protoimpl.UnsafeEnabled { + file_teststream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestStreamRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_teststream_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_teststream_proto_goTypes, + DependencyIndexes: file_teststream_proto_depIdxs, + MessageInfos: file_teststream_proto_msgTypes, + }.Build() + File_teststream_proto = out.File + file_teststream_proto_rawDesc = nil + file_teststream_proto_goTypes = nil + file_teststream_proto_depIdxs = nil +} diff --git a/proto/go/gitalypb/teststream_grpc.pb.go b/proto/go/gitalypb/teststream_grpc.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..e8b4c5fcb68c1776b26faeb7b279392e980c7c2a --- /dev/null +++ b/proto/go/gitalypb/teststream_grpc.pb.go @@ -0,0 +1,102 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package gitalypb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// TestStreamServiceClient is the client API for TestStreamService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TestStreamServiceClient interface { + TestStream(ctx context.Context, in *TestStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) +} + +type testStreamServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewTestStreamServiceClient(cc grpc.ClientConnInterface) TestStreamServiceClient { + return &testStreamServiceClient{cc} +} + +func (c *testStreamServiceClient) TestStream(ctx context.Context, in *TestStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/gitaly.TestStreamService/TestStream", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TestStreamServiceServer is the server API for TestStreamService service. +// All implementations must embed UnimplementedTestStreamServiceServer +// for forward compatibility +type TestStreamServiceServer interface { + TestStream(context.Context, *TestStreamRequest) (*emptypb.Empty, error) + mustEmbedUnimplementedTestStreamServiceServer() +} + +// UnimplementedTestStreamServiceServer must be embedded to have forward compatible implementations. +type UnimplementedTestStreamServiceServer struct { +} + +func (UnimplementedTestStreamServiceServer) TestStream(context.Context, *TestStreamRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TestStream not implemented") +} +func (UnimplementedTestStreamServiceServer) mustEmbedUnimplementedTestStreamServiceServer() {} + +// UnsafeTestStreamServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TestStreamServiceServer will +// result in compilation errors. +type UnsafeTestStreamServiceServer interface { + mustEmbedUnimplementedTestStreamServiceServer() +} + +func RegisterTestStreamServiceServer(s grpc.ServiceRegistrar, srv TestStreamServiceServer) { + s.RegisterService(&TestStreamService_ServiceDesc, srv) +} + +func _TestStreamService_TestStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TestStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TestStreamServiceServer).TestStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gitaly.TestStreamService/TestStream", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TestStreamServiceServer).TestStream(ctx, req.(*TestStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// TestStreamService_ServiceDesc is the grpc.ServiceDesc for TestStreamService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TestStreamService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "gitaly.TestStreamService", + HandlerType: (*TestStreamServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "TestStream", + Handler: _TestStreamService_TestStream_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "teststream.proto", +} diff --git a/proto/hook.proto b/proto/hook.proto index 71dd75cd64bb8ab7c587a40f6a3a2e5455fb7290..efde832be477124b475bde196831505ea66c389c 100644 --- a/proto/hook.proto +++ b/proto/hook.proto @@ -6,6 +6,7 @@ option go_package = "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"; import "lint.proto"; import "shared.proto"; +import "google/protobuf/empty.proto"; service HookService { rpc PreReceiveHook(stream PreReceiveHookRequest) returns (stream PreReceiveHookResponse) { @@ -36,6 +37,11 @@ service HookService { op: ACCESSOR }; } + rpc PackObjectsHookStream(PackObjectsHookStreamRequest) returns (google.protobuf.Empty) { + option (op_type) = { + op: ACCESSOR + }; + } } message PreReceiveHookRequest { @@ -110,3 +116,9 @@ message PackObjectsHookResponse { // stderr contains progress messages (such as "Enumerating objects ...") bytes stderr = 2; } + +message PackObjectsHookStreamRequest { + Repository repository = 1 [(target_repository)=true]; + // args contains the arguments passed to the pack-objects hook, without the leading "git" + repeated string args = 2; +} diff --git a/proto/teststream.proto b/proto/teststream.proto new file mode 100644 index 0000000000000000000000000000000000000000..734047887f708a247d6301041ea76addcc2d8a26 --- /dev/null +++ b/proto/teststream.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package gitaly; + +option go_package = "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"; + +import "lint.proto"; +import "shared.proto"; +import "google/protobuf/empty.proto"; + +service TestStreamService { + rpc TestStream(TestStreamRequest) returns (google.protobuf.Empty) { + option (op_type) = { + op: ACCESSOR + }; + } +} + +message TestStreamRequest { + Repository repository = 1 [(target_repository)=true]; + int64 size = 2; +} diff --git a/ruby/proto/gitaly.rb b/ruby/proto/gitaly.rb index 9c80cea63b406246751f0c22dcf0d0c007d4a7bc..9cfff81cf0f07d90af86272847a82b6d2279196d 100644 --- a/ruby/proto/gitaly.rb +++ b/ruby/proto/gitaly.rb @@ -37,6 +37,8 @@ require 'gitaly/smarthttp_services_pb' require 'gitaly/ssh_services_pb' +require 'gitaly/teststream_services_pb' + require 'gitaly/transaction_services_pb' require 'gitaly/wiki_services_pb' diff --git a/ruby/proto/gitaly/hook_pb.rb b/ruby/proto/gitaly/hook_pb.rb index 20c8cb8b18995a447163662a79ff64fd34e0f4ca..8a8d4b87cdbcd489ba3e0abfb50491ff5d97f06e 100644 --- a/ruby/proto/gitaly/hook_pb.rb +++ b/ruby/proto/gitaly/hook_pb.rb @@ -5,6 +5,7 @@ require 'google/protobuf' require 'lint_pb' require 'shared_pb' +require 'google/protobuf/empty_pb' Google::Protobuf::DescriptorPool.generated_pool.build do add_file("hook.proto", :syntax => :proto3) do add_message "gitaly.PreReceiveHookRequest" do @@ -66,6 +67,10 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :stdout, :bytes, 1 optional :stderr, :bytes, 2 end + add_message "gitaly.PackObjectsHookStreamRequest" do + optional :repository, :message, 1, "gitaly.Repository" + repeated :args, :string, 2 + end end end @@ -81,4 +86,5 @@ module Gitaly ReferenceTransactionHookResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ReferenceTransactionHookResponse").msgclass PackObjectsHookRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.PackObjectsHookRequest").msgclass PackObjectsHookResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.PackObjectsHookResponse").msgclass + PackObjectsHookStreamRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.PackObjectsHookStreamRequest").msgclass end diff --git a/ruby/proto/gitaly/hook_services_pb.rb b/ruby/proto/gitaly/hook_services_pb.rb index 196ef344b52c7f5a10530b308f11c90889601ecc..66c6ea4a52cff52a43724f68eeba77b58e2df544 100644 --- a/ruby/proto/gitaly/hook_services_pb.rb +++ b/ruby/proto/gitaly/hook_services_pb.rb @@ -22,6 +22,7 @@ module Gitaly # uploadpack.packObjectsHook mechanism. It generates a stream of packed # Git objects. rpc :PackObjectsHook, stream(Gitaly::PackObjectsHookRequest), stream(Gitaly::PackObjectsHookResponse) + rpc :PackObjectsHookStream, Gitaly::PackObjectsHookStreamRequest, Google::Protobuf::Empty end Stub = Service.rpc_stub_class diff --git a/ruby/proto/gitaly/teststream_pb.rb b/ruby/proto/gitaly/teststream_pb.rb new file mode 100644 index 0000000000000000000000000000000000000000..d75050f048f6dec7fef0cf5d57e330b17a11b990 --- /dev/null +++ b/ruby/proto/gitaly/teststream_pb.rb @@ -0,0 +1,20 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: teststream.proto + +require 'google/protobuf' + +require 'lint_pb' +require 'shared_pb' +require 'google/protobuf/empty_pb' +Google::Protobuf::DescriptorPool.generated_pool.build do + add_file("teststream.proto", :syntax => :proto3) do + add_message "gitaly.TestStreamRequest" do + optional :repository, :message, 1, "gitaly.Repository" + optional :size, :int64, 2 + end + end +end + +module Gitaly + TestStreamRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.TestStreamRequest").msgclass +end diff --git a/ruby/proto/gitaly/teststream_services_pb.rb b/ruby/proto/gitaly/teststream_services_pb.rb new file mode 100644 index 0000000000000000000000000000000000000000..d617668f5e6f2671bdd3f6422c0e57c9f8f0c7ff --- /dev/null +++ b/ruby/proto/gitaly/teststream_services_pb.rb @@ -0,0 +1,22 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: teststream.proto for package 'gitaly' + +require 'grpc' +require 'teststream_pb' + +module Gitaly + module TestStreamService + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'gitaly.TestStreamService' + + rpc :TestStream, Gitaly::TestStreamRequest, Google::Protobuf::Empty + end + + Stub = Service.rpc_stub_class + end +end