diff --git a/cmd/gitaly-hooks/hooks_test.go b/cmd/gitaly-hooks/hooks_test.go index 80e7dcddeb33f78b3ff404b81367e2deecc49e57..8962173319fcce8fcc6821270ffe4cbf72ec1b10 100644 --- a/cmd/gitaly-hooks/hooks_test.go +++ b/cmd/gitaly-hooks/hooks_test.go @@ -609,7 +609,7 @@ func (svc featureFlagAsserter) PackObjectsHook(stream gitalypb.HookService_PackO } func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client) { - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, featureFlagAsserter{ t: t, wrapped: hook.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory()), }) diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go index 777b0757d65c87b5b9cb426c7e33f3347f7156f0..cce031c4ca4dcc65e79a76e8ce014d34ba29c55e 100644 --- a/cmd/gitaly-ssh/auth_test.go +++ b/cmd/gitaly-ssh/auth_test.go @@ -23,6 +23,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -152,7 +153,8 @@ func runServer(t *testing.T, secure bool, cfg config.Cfg, connectionType string, hookManager := hook.NewManager(locator, txManager, gitlab.NewMockClient(), cfg) gitCmdFactory := git.NewExecCommandFactory(cfg) diskCache := cache.New(cfg, locator) - srv, err := server.New(secure, cfg, testhelper.DiscardTestEntry(t), registry, diskCache) + streamRPCServer := streamrpc.NewServer() + srv, err := server.New(secure, cfg, testhelper.DiscardTestEntry(t), registry, diskCache, streamRPCServer) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ Cfg: cfg, diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index dd424300376598f19ff28c8f0a9b7c887e1aacc8..be48a6c203ba49e4c778288bee559952e47a97b7 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -29,6 +29,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" glog "gitlab.com/gitlab-org/gitaly/v14/internal/log" "gitlab.com/gitlab-org/gitaly/v14/internal/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" "gitlab.com/gitlab-org/gitaly/v14/internal/version" "gitlab.com/gitlab-org/labkit/monitoring" @@ -196,6 +197,19 @@ func run(cfg config.Cfg) error { } defer rubySrv.Stop() + deps := &service.Dependencies{ + Cfg: cfg, + RubyServer: rubySrv, + GitalyHookManager: hookManager, + TransactionManager: transactionManager, + StorageLocator: locator, + ClientPool: conns, + GitCmdFactory: gitCmdFactory, + Linguist: ling, + CatfileCache: catfileCache, + DiskCache: diskCache, + } + for _, c := range []starter.Config{ {Name: starter.Unix, Addr: cfg.SocketPath, HandoverOnUpgrade: true}, {Name: starter.Unix, Addr: cfg.GitalyInternalSocketPath(), HandoverOnUpgrade: false}, @@ -206,32 +220,23 @@ func run(cfg config.Cfg) error { continue } - var srv *grpc.Server + var grpcSrv *grpc.Server + var srpcSrv *streamrpc.Server if c.HandoverOnUpgrade { - srv, err = gitalyServerFactory.CreateExternal(c.IsSecure()) + grpcSrv, srpcSrv, err = gitalyServerFactory.CreateExternal(c.IsSecure()) if err != nil { return fmt.Errorf("create external gRPC server: %w", err) } } else { - srv, err = gitalyServerFactory.CreateInternal() + grpcSrv, srpcSrv, err = gitalyServerFactory.CreateInternal() if err != nil { return fmt.Errorf("create internal gRPC server: %w", err) } } - setup.RegisterAll(srv, &service.Dependencies{ - Cfg: cfg, - RubyServer: rubySrv, - GitalyHookManager: hookManager, - TransactionManager: transactionManager, - StorageLocator: locator, - ClientPool: conns, - GitCmdFactory: gitCmdFactory, - Linguist: ling, - CatfileCache: catfileCache, - DiskCache: diskCache, - }) - b.RegisterStarter(starter.New(c, srv)) + setup.RegisterAll(grpcSrv, deps) + setup.RegisterAll(srpcSrv, deps) + b.RegisterStarter(starter.New(c, grpcSrv)) } if addr := cfg.PrometheusListenAddr; addr != "" { diff --git a/internal/git/remoterepo/repository_test.go b/internal/git/remoterepo/repository_test.go index b74f88f75813c1df9ac28c645440cb83eac086c8..aff19797da35885729fb4e958e31e8bb2a292819 100644 --- a/internal/git/remoterepo/repository_test.go +++ b/internal/git/remoterepo/repository_test.go @@ -22,7 +22,7 @@ import ( func TestRepository(t *testing.T) { cfg := testcfg.Build(t) - serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/git/updateref/update_with_hooks_test.go b/internal/git/updateref/update_with_hooks_test.go index 3585ed5b0971079547b47210300588eba6c1e9bf..972bf6881c40fe9e24f373adb3eaff2227ce54c4 100644 --- a/internal/git/updateref/update_with_hooks_test.go +++ b/internal/git/updateref/update_with_hooks_test.go @@ -100,7 +100,7 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { // We need to set up a separate "real" hook service here, as it will be used in // git-update-ref(1) spawned by `updateRefWithHooks()` - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory())) }) diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 35959828e0377891e694cbbeb6fb6afd10ba79d1..6dee3c8212cf87322b6fa48279468c09398e131f 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -25,8 +25,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/teststream" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -86,30 +88,54 @@ func TestTLSSanity(t *testing.T) { func TestAuthFailures(t *testing.T) { testCases := []struct { - desc string - opts []grpc.DialOption - code codes.Code + desc string + creds credentials.PerRPCCredentials + code codes.Code + message string }{ - {desc: "no auth", opts: nil, code: codes.Unauthenticated}, { - desc: "invalid auth", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(brokenAuth{})}, - code: codes.Unauthenticated, + desc: "no auth", + creds: nil, + code: codes.Unauthenticated, + message: "rpc error: code = Unauthenticated desc = authentication required", }, { - desc: "wrong secret", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("foobar"))}, - code: codes.PermissionDenied, + desc: "invalid auth", + creds: brokenAuth{}, + code: codes.Unauthenticated, + message: "rpc error: code = Unauthenticated desc = authentication required", + }, + { + desc: "wrong secret", + creds: gitalyauth.RPCCredentialsV2("foobar"), + code: codes.PermissionDenied, + message: "rpc error: code = PermissionDenied desc = permission denied", }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - serverSocketPath := runServer(t, config.Cfg{Auth: auth.Config{Token: "quxbaz"}}) - connOpts := append(tc.opts, grpc.WithInsecure()) + cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{ + Auth: auth.Config{Token: "quxbaz"}, + })) + serverSocketPath := runServer(t, cfg) + + // Make a healthcheck gRPC call + connOpts := []grpc.DialOption{grpc.WithInsecure()} + if tc.creds != nil { + connOpts = append(connOpts, grpc.WithPerRPCCredentials(tc.creds)) + } conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) t.Cleanup(func() { conn.Close() }) testhelper.RequireGrpcError(t, healthCheck(conn), tc.code) + + // // Make a streamRPC call + var callOpts []streamrpc.CallOption + if tc.creds != nil { + callOpts = append(callOpts, streamrpc.WithCredentials(tc.creds)) + } + _, _, err = checkStreamRPC(t, streamrpc.DialNet(serverSocketPath), repo, callOpts...) + require.EqualError(t, err, tc.message) }) } } @@ -119,40 +145,54 @@ func TestAuthSuccess(t *testing.T) { testCases := []struct { desc string - opts []grpc.DialOption + creds credentials.PerRPCCredentials required bool token string }{ {desc: "no auth, not required"}, { desc: "v2 correct auth, not required", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))}, + creds: gitalyauth.RPCCredentialsV2(token), token: token, }, { desc: "v2 incorrect auth, not required", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("incorrect"))}, + creds: gitalyauth.RPCCredentialsV2("incorrect"), token: token, }, { desc: "v2 correct auth, required", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))}, + creds: gitalyauth.RPCCredentialsV2(token), token: token, required: true, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{ + cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{ Auth: auth.Config{Token: tc.token, Transitioning: !tc.required}, })) serverSocketPath := runServer(t, cfg) - connOpts := append(tc.opts, grpc.WithInsecure()) + + // Make a healthcheck gRPC call + connOpts := []grpc.DialOption{grpc.WithInsecure()} + if tc.creds != nil { + connOpts = append(connOpts, grpc.WithPerRPCCredentials(tc.creds)) + } conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) t.Cleanup(func() { conn.Close() }) assert.NoError(t, healthCheck(conn), tc.desc) + + // // Make a streamRPC call + var callOpts []streamrpc.CallOption + if tc.creds != nil { + callOpts = append(callOpts, streamrpc.WithCredentials(tc.creds)) + } + in, out, err := checkStreamRPC(t, streamrpc.DialNet(serverSocketPath), repo, callOpts...) + require.NoError(t, err) + require.Equal(t, in, out) }) } } @@ -201,8 +241,10 @@ func runServer(t *testing.T, cfg config.Cfg) string { gitCmdFactory := git.NewExecCommandFactory(cfg) catfileCache := catfile.NewCache(cfg) diskCache := cache.New(cfg, locator) + streamRPCServer := streamrpc.NewServer() + gitalypb.RegisterTestStreamServiceServer(streamRPCServer, teststream.NewServer(locator)) - srv, err := New(false, cfg, testhelper.DiscardTestEntry(t), registry, diskCache) + srv, err := New(false, cfg, testhelper.DiscardTestEntry(t), registry, diskCache, streamRPCServer) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ @@ -236,7 +278,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { conns := client.NewPool() t.Cleanup(func() { conns.Close() }) - srv, err := New(true, cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + srv, err := New(true, cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), streamrpc.NewServer()) require.NoError(t, err) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index ffdf55c10dd847dc9a5042b6dd4eca617a1b892b..82b48f958e35423f61c1cb997731522cd66b7171 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -28,6 +28,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" "google.golang.org/grpc" @@ -74,6 +75,7 @@ func New( logrusEntry *log.Entry, registry *backchannel.Registry, cacheInvalidator diskcache.Invalidator, + streamRPCServer *streamrpc.Server, ) (*grpc.Server, error) { ctxTagOpts := []grpcmwtags.Option{ grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), @@ -96,53 +98,63 @@ func New( }) } + serverStreamInterceptorChain := grpcmw.ChainStreamServer( + grpcmwtags.StreamServerInterceptor(ctxTagOpts...), + grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.StreamInterceptor, + grpcprometheus.StreamServerInterceptor, + commandstatshandler.StreamInterceptor, + grpcmwlogrus.StreamServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), + sentryhandler.StreamLogHandler, + cancelhandler.Stream, // Should be below LogHandler + auth.StreamServerInterceptor(cfg.Auth), + lh.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + grpctracing.StreamServerTracingInterceptor(), + cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.StreamPanicHandler, + ) + + serverUnaryInterceptorChain := grpcmw.ChainUnaryServer( + grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), + grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.UnaryInterceptor, + grpcprometheus.UnaryServerInterceptor, + commandstatshandler.UnaryInterceptor, + grpcmwlogrus.UnaryServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), + sentryhandler.UnaryLogHandler, + cancelhandler.Unary, // Should be below LogHandler + auth.UnaryServerInterceptor(cfg.Auth), + lh.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + grpctracing.UnaryServerTracingInterceptor(), + cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.UnaryPanicHandler, + ) + + streamRPCServer.UseInterceptor(serverUnaryInterceptorChain) + lm := listenmux.New(transportCredentials) lm.Register(backchannel.NewServerHandshaker( logrusEntry, registry, []grpc.DialOption{client.UnaryInterceptor()}, )) + lm.Register(streamrpc.NewServerHandshaker( + streamRPCServer, + gitalylog.Default(), + )) opts := []grpc.ServerOption{ grpc.Creds(lm), - grpc.StreamInterceptor(grpcmw.ChainStreamServer( - grpcmwtags.StreamServerInterceptor(ctxTagOpts...), - grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.StreamInterceptor, - grpcprometheus.StreamServerInterceptor, - commandstatshandler.StreamInterceptor, - grpcmwlogrus.StreamServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), - sentryhandler.StreamLogHandler, - cancelhandler.Stream, // Should be below LogHandler - auth.StreamServerInterceptor(cfg.Auth), - lh.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.StreamServerTracingInterceptor(), - cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.StreamPanicHandler, - )), - grpc.UnaryInterceptor(grpcmw.ChainUnaryServer( - grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), - grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.UnaryInterceptor, - grpcprometheus.UnaryServerInterceptor, - commandstatshandler.UnaryInterceptor, - grpcmwlogrus.UnaryServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), - sentryhandler.UnaryLogHandler, - cancelhandler.Unary, // Should be below LogHandler - auth.UnaryServerInterceptor(cfg.Auth), - lh.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.UnaryServerTracingInterceptor(), - cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.UnaryPanicHandler, - )), + grpc.StreamInterceptor(serverStreamInterceptorChain), + grpc.UnaryInterceptor(serverUnaryInterceptorChain), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 20 * time.Second, PermitWithoutStream: true, diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index 2b2d9e73dfdf5f3f67b4831d1bb9d52346180245..bae8fad63740e54e7c2db6bfa953ef8b138b7e1b 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/maintenance" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -133,26 +134,30 @@ func (s *GitalyServerFactory) GracefulStop() { } } -// CreateExternal creates a new external gRPC server. The external servers are closed +// CreateExternal creates a new external gRPC server and StreamRPC server. The external servers are closed // before the internal servers when gracefully shutting down. -func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) { - server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator) +func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, *streamrpc.Server, error) { + streamRPCServer := streamrpc.NewServer() + grpcServer, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator, streamRPCServer) if err != nil { - return nil, err + return nil, nil, err } - s.externalServers = append(s.externalServers, server) - return server, nil + s.externalServers = append(s.externalServers, grpcServer) + + return grpcServer, streamRPCServer, nil } -// CreateInternal creates a new internal gRPC server. Internal servers are closed +// CreateInternal creates a new internal gRPC server and StreamRPC server. Internal servers are closed // after the external ones when gracefully shutting down. -func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, error) { - server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator) +func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, *streamrpc.Server, error) { + streamRPCServer := streamrpc.NewServer() + grpcServer, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator, streamRPCServer) if err != nil { - return nil, err + return nil, nil, err } - s.internalServers = append(s.internalServers, server) - return server, nil + s.internalServers = append(s.internalServers, grpcServer) + + return grpcServer, streamRPCServer, nil } diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index c851c32d7c5f12d0d680125b2600656d277663c9..34c9c46c376cba590749e627a53ab084e74e26aa 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -1,10 +1,14 @@ package server import ( + "bytes" "context" "crypto/tls" "crypto/x509" "errors" + "io" + "io/ioutil" + "math/rand" "net" "os" "testing" @@ -16,8 +20,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/v14/internal/cache" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/teststream" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -30,71 +37,24 @@ func TestGitalyServerFactory(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - checkHealth := func(t *testing.T, sf *GitalyServerFactory, schema, addr string) healthpb.HealthClient { - t.Helper() - - var cc *grpc.ClientConn - if schema == starter.TLS { - listener, err := net.Listen(starter.TCP, addr) - require.NoError(t, err) - t.Cleanup(func() { listener.Close() }) - - srv, err := sf.CreateExternal(true) - require.NoError(t, err) - healthpb.RegisterHealthServer(srv, health.NewServer()) - go srv.Serve(listener) - - certPool, err := x509.SystemCertPool() - require.NoError(t, err) - - pem := testhelper.MustReadFile(t, sf.cfg.TLS.CertPath) - require.True(t, certPool.AppendCertsFromPEM(pem)) - - creds := credentials.NewTLS(&tls.Config{ - RootCAs: certPool, - MinVersion: tls.VersionTLS12, - }) - - cc, err = grpc.DialContext(ctx, listener.Addr().String(), grpc.WithTransportCredentials(creds)) - require.NoError(t, err) - } else { - listener, err := net.Listen(schema, addr) - require.NoError(t, err) - t.Cleanup(func() { listener.Close() }) - - srv, err := sf.CreateExternal(false) - require.NoError(t, err) - healthpb.RegisterHealthServer(srv, health.NewServer()) - go srv.Serve(listener) - - endpoint, err := starter.ComposeEndpoint(schema, listener.Addr().String()) - require.NoError(t, err) - - cc, err = client.Dial(endpoint, nil) - require.NoError(t, err) - } - - t.Cleanup(func() { cc.Close() }) - - healthClient := healthpb.NewHealthClient(cc) + t.Run("insecure over TCP", func(t *testing.T) { + cfg, repo, _ := testcfg.BuildWithRepo(t) + sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) - resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{}) - require.NoError(t, err) - require.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status) - return healthClient - } + check(t, ctx, sf, cfg, repo, starter.TCP, "localhost:0") + }) - t.Run("insecure", func(t *testing.T) { - cfg := testcfg.Build(t) + t.Run("insecure over Unix Socket", func(t *testing.T) { + cfg, repo, _ := testcfg.BuildWithRepo(t) sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) - checkHealth(t, sf, starter.TCP, "localhost:0") + check(t, ctx, sf, cfg, repo, starter.Unix, testhelper.GetTemporaryGitalySocketFileName(t)) }) t.Run("secure", func(t *testing.T) { certFile, keyFile := testhelper.GenerateCerts(t) - cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{TLS: config.TLS{ + cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{TLS: config.TLS{ CertPath: certFile, KeyPath: keyFile, }})) @@ -102,20 +62,20 @@ func TestGitalyServerFactory(t *testing.T) { sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) t.Cleanup(sf.Stop) - checkHealth(t, sf, starter.TLS, "localhost:0") + check(t, ctx, sf, cfg, repo, starter.TLS, "localhost:0") }) t.Run("all services must be stopped", func(t *testing.T) { - cfg := testcfg.Build(t) + cfg, repo, _ := testcfg.BuildWithRepo(t) sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) t.Cleanup(sf.Stop) - tcpHealthClient := checkHealth(t, sf, starter.TCP, "localhost:0") + tcpHealthClient := check(t, ctx, sf, cfg, repo, starter.TCP, "localhost:0") socket := testhelper.GetTemporaryGitalySocketFileName(t) t.Cleanup(func() { require.NoError(t, os.RemoveAll(socket)) }) - socketHealthClient := checkHealth(t, sf, starter.Unix, socket) + socketHealthClient := check(t, ctx, sf, cfg, repo, starter.Unix, socket) sf.GracefulStop() // stops all started servers(listeners) @@ -185,7 +145,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { }{ { createServer: func() *grpc.Server { - server, err := sf.CreateInternal() + server, _, err := sf.CreateInternal() require.NoError(t, err) return server }, @@ -195,7 +155,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { }, { createServer: func() *grpc.Server { - server, err := sf.CreateExternal(false) + server, _, err := sf.CreateExternal(false) require.NoError(t, err) return server }, @@ -287,3 +247,117 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { // wait until the graceful shutdown completes <-shutdownCompeleted } + +func check(t *testing.T, ctx context.Context, sf *GitalyServerFactory, cfg config.Cfg, repo *gitalypb.Repository, schema, addr string) healthpb.HealthClient { + t.Helper() + + var grpcConn *grpc.ClientConn + var streamRPCDial streamrpc.DialFunc + + if schema == starter.TLS { + listener, err := net.Listen(starter.TCP, addr) + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) + + grpcSrv, srpcSrv, err := sf.CreateExternal(true) + require.NoError(t, err) + healthpb.RegisterHealthServer(grpcSrv, health.NewServer()) + registerStreamRPCServers(t, srpcSrv, cfg) + go grpcSrv.Serve(listener) + + certPool, err := x509.SystemCertPool() + require.NoError(t, err) + + pem := testhelper.MustReadFile(t, sf.cfg.TLS.CertPath) + require.True(t, certPool.AppendCertsFromPEM(pem)) + + tlsConf := &tls.Config{ + RootCAs: certPool, + MinVersion: tls.VersionTLS12, + } + creds := credentials.NewTLS(tlsConf) + + streamRPCDial = streamrpc.DialTLS(listener.Addr().String(), tlsConf) + grpcConn, err = grpc.DialContext(ctx, listener.Addr().String(), grpc.WithTransportCredentials(creds)) + require.NoError(t, err) + } else { + listener, err := net.Listen(schema, addr) + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) + + grpcSrv, srpcSrv, err := sf.CreateExternal(false) + require.NoError(t, err) + healthpb.RegisterHealthServer(grpcSrv, health.NewServer()) + registerStreamRPCServers(t, srpcSrv, cfg) + go grpcSrv.Serve(listener) + + endpoint, err := starter.ComposeEndpoint(schema, listener.Addr().String()) + require.NoError(t, err) + + streamRPCDial = streamrpc.DialNet(endpoint) + grpcConn, err = client.Dial(endpoint, nil) + require.NoError(t, err) + } + + // Make a healthcheck gRPC call + t.Cleanup(func() { grpcConn.Close() }) + healthClient := healthpb.NewHealthClient(grpcConn) + + resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{}) + require.NoError(t, err) + require.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status) + + // Make a streamRPC call + in, out, err := checkStreamRPC(t, streamRPCDial, repo) + require.NoError(t, err) + require.Equal(t, in, out, "byte stream works") + + return healthClient +} + +func registerStreamRPCServers(t *testing.T, srv *streamrpc.Server, cfg config.Cfg) { + gitalypb.RegisterTestStreamServiceServer(srv, teststream.NewServer(config.NewLocator(cfg))) +} + +func checkStreamRPC(t *testing.T, dial streamrpc.DialFunc, repo *gitalypb.Repository, opts ...streamrpc.CallOption) ([]byte, []byte, error) { + ctx, cancel := testhelper.Context() + defer cancel() + + const size = 1024 * 1024 + + in := make([]byte, size) + _, err := rand.Read(in) + require.NoError(t, err) + + var out []byte + require.NotEqual(t, in, out) + + err = streamrpc.Call( + ctx, + dial, + "/gitaly.TestStreamService/TestStream", + &gitalypb.TestStreamRequest{ + Repository: repo, + Size: size, + }, + func(c net.Conn) error { + errC := make(chan error, 1) + go func() { + var err error + out, err = ioutil.ReadAll(c) + errC <- err + }() + + if _, err := io.Copy(c, bytes.NewReader(in)); err != nil { + return err + } + if err := <-errC; err != nil { + return err + } + + return nil + }, + opts..., + ) + return in, out, err +} diff --git a/internal/gitaly/service/blob/testhelper_test.go b/internal/gitaly/service/blob/testhelper_test.go index e7b64bd503b20d295ad589b655b634633301fdd2..a0de32432e9863910966706bddc0ed3852c710dd 100644 --- a/internal/gitaly/service/blob/testhelper_test.go +++ b/internal/gitaly/service/blob/testhelper_test.go @@ -34,7 +34,7 @@ func setup(t *testing.T) (config.Cfg, *gitalypb.Repository, string, gitalypb.Blo repo, repoPath, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name()) t.Cleanup(cleanup) - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterBlobServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/cleanup/testhelper_test.go b/internal/gitaly/service/cleanup/testhelper_test.go index d16a8bd4d645d5ff43151a84c3677deabe05fb43..8dc286effee28eb5885482984a8f43b5c4fb540c 100644 --- a/internal/gitaly/service/cleanup/testhelper_test.go +++ b/internal/gitaly/service/cleanup/testhelper_test.go @@ -37,7 +37,7 @@ func setupCleanupService(t *testing.T) (config.Cfg, *gitalypb.Repository, string } func runCleanupServiceServer(t *testing.T, cfg config.Cfg) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterCleanupServiceServer(srv, NewServer( deps.GetCfg(), deps.GetGitCmdFactory(), diff --git a/internal/gitaly/service/commit/testhelper_test.go b/internal/gitaly/service/commit/testhelper_test.go index 28eef0f7876ce7688931b24183c5145888c39e90..a28fb70ff12c25796f5d17d6c19092ab7aac7960 100644 --- a/internal/gitaly/service/commit/testhelper_test.go +++ b/internal/gitaly/service/commit/testhelper_test.go @@ -66,7 +66,7 @@ func setupCommitServiceCreateRepo( func startTestServices(t testing.TB, cfg config.Cfg) string { t.Helper() - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterCommitServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/conflicts/testhelper_test.go b/internal/gitaly/service/conflicts/testhelper_test.go index c8f14d617939742060711fd06075fbb60c3ee027..9b55622b243065ecc0e4a65df0c8d5223a925d23 100644 --- a/internal/gitaly/service/conflicts/testhelper_test.go +++ b/internal/gitaly/service/conflicts/testhelper_test.go @@ -84,7 +84,7 @@ func SetupConflictsService(t testing.TB, bare bool, hookManager hook.Manager) (c } func runConflictsServer(t testing.TB, cfg config.Cfg, hookManager hook.Manager) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterConflictsServiceServer(srv, NewServer( deps.GetCfg(), deps.GetHookManager(), diff --git a/internal/gitaly/service/diff/testhelper_test.go b/internal/gitaly/service/diff/testhelper_test.go index 1029cd3ed999b3a1ba80103df2091e9b297f3808..c787d35fc25429f22f90d7f5f5503192e2806988 100644 --- a/internal/gitaly/service/diff/testhelper_test.go +++ b/internal/gitaly/service/diff/testhelper_test.go @@ -27,7 +27,7 @@ func testMain(m *testing.M) int { func setupDiffService(t testing.TB, opt ...testserver.GitalyServerOpt) (config.Cfg, *gitalypb.Repository, string, gitalypb.DiffServiceClient) { cfg, repo, repoPath := testcfg.BuildWithRepo(t) - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterDiffServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/hook/testhelper_test.go b/internal/gitaly/service/hook/testhelper_test.go index dcbdd56e207f6cc548617c8fcf5c8a77d0a5a98a..cd2630bfd355ccfbabed8298d082f343efae9490 100644 --- a/internal/gitaly/service/hook/testhelper_test.go +++ b/internal/gitaly/service/hook/testhelper_test.go @@ -55,7 +55,7 @@ type serverOption func(*server) func runHooksServer(t testing.TB, cfg config.Cfg, opts []serverOption, serverOpts ...testserver.GitalyServerOpt) string { t.Helper() - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { hookServer := NewServer( deps.GetCfg(), gitalyhook.NewManager(deps.GetLocator(), deps.GetTxManager(), deps.GetGitlabClient(), deps.GetCfg()), diff --git a/internal/gitaly/service/internalgitaly/testhelper_test.go b/internal/gitaly/service/internalgitaly/testhelper_test.go index d768d2d80a93ee105b47c276349bac8fc46a1232..f092df9313a3b062ff8ac359084776e798b85d99 100644 --- a/internal/gitaly/service/internalgitaly/testhelper_test.go +++ b/internal/gitaly/service/internalgitaly/testhelper_test.go @@ -25,7 +25,7 @@ func testMain(m *testing.M) int { } func setupInternalGitalyService(t *testing.T, cfg config.Cfg, internalService gitalypb.InternalGitalyServer) gitalypb.InternalGitalyClient { - add := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + add := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterInternalGitalyServer(srv, internalService) }, testserver.WithDisablePraefect()) conn, err := grpc.Dial(add, grpc.WithInsecure()) diff --git a/internal/gitaly/service/namespace/testhelper_test.go b/internal/gitaly/service/namespace/testhelper_test.go index a37b5a03adc38b6daf7c69b380434d338d08c40c..a3851a41aa05b9356a5e60dcc4f56d5eba71e24a 100644 --- a/internal/gitaly/service/namespace/testhelper_test.go +++ b/internal/gitaly/service/namespace/testhelper_test.go @@ -17,7 +17,7 @@ func setupNamespaceService(t testing.TB, opts ...testserver.GitalyServerOpt) (co cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "other")) cfg := cfgBuilder.Build(t) - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterNamespaceServiceServer(srv, NewServer(deps.GetLocator())) }, opts...) diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go index 7777a638c3b784cda7ae9ab643270b61a35c9222..f1b70c22046b522c45d97cb7cc1e3a34dd512e6b 100644 --- a/internal/gitaly/service/objectpool/testhelper_test.go +++ b/internal/gitaly/service/objectpool/testhelper_test.go @@ -46,7 +46,7 @@ func setup(t *testing.T, opts ...testserver.GitalyServerOpt) (config.Cfg, *gital } func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator, logger *logrus.Logger, opts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterObjectPoolServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index 3f082d51737f986af8319ee1fe7b0fd336165195..ceea9984281e23837259e0056ca697180184ecd5 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -122,7 +122,7 @@ func TestUserCreateBranchWithTransaction(t *testing.T) { transactionServer := &testTransactionServer{} cfg.ListenAddr = "127.0.0.1:0" // runs gitaly on the TCP address - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), nil, @@ -483,7 +483,7 @@ func TestUserDeleteBranch_transaction(t *testing.T) { transactionServer := &testTransactionServer{} - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), nil, diff --git a/internal/gitaly/service/operations/tags_test.go b/internal/gitaly/service/operations/tags_test.go index c709da02c70a66b5c49a4e722690aca561aac010..f2f5eaa34408b8da2ac51587f2c035a89fccba19 100644 --- a/internal/gitaly/service/operations/tags_test.go +++ b/internal/gitaly/service/operations/tags_test.go @@ -279,7 +279,7 @@ func TestUserCreateTagWithTransaction(t *testing.T) { // runOperationServiceServer as it puts a Praefect server in between if // running Praefect tests, which would break our test setup. transactionServer := &testTransactionServer{} - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), nil, diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go index 0582b12d74e6bc87adea0ce5e383eae9f3df147f..59506a9315572f29f629bfe4c0225053edeec950 100644 --- a/internal/gitaly/service/operations/testhelper_test.go +++ b/internal/gitaly/service/operations/testhelper_test.go @@ -110,7 +110,7 @@ func setupOperationsServiceWithRuby( func runOperationServiceServer(t testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server, options ...testserver.GitalyServerOpt) string { t.Helper() - return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/ref/delete_refs_test.go b/internal/gitaly/service/ref/delete_refs_test.go index ba28bf018a80f48393505f4c8bce161c1d0b0581..677e81d3e52f8b730010acecee2706b3cdf75f69 100644 --- a/internal/gitaly/service/ref/delete_refs_test.go +++ b/internal/gitaly/service/ref/delete_refs_test.go @@ -91,7 +91,7 @@ func TestDeleteRefs_transaction(t *testing.T) { }, } - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRefServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/ref/testhelper_test.go b/internal/gitaly/service/ref/testhelper_test.go index 55683f0957cc2739cf092887935a66b0f25461b3..eda08dc8bf3737f6ba56c2cf6ba46fb2c7133c31 100644 --- a/internal/gitaly/service/ref/testhelper_test.go +++ b/internal/gitaly/service/ref/testhelper_test.go @@ -70,7 +70,7 @@ func setupRefServiceWithoutRepo(t testing.TB) (config.Cfg, gitalypb.RefServiceCl } func runRefServiceServer(t testing.TB, cfg config.Cfg) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRefServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/remote/fetch_internal_remote_test.go b/internal/gitaly/service/remote/fetch_internal_remote_test.go index 35ec8e25c7aef089f2c069eb519177ed7306ce66..f142b6deccf3584005610824cc3c4fa7d81dc9fc 100644 --- a/internal/gitaly/service/remote/fetch_internal_remote_test.go +++ b/internal/gitaly/service/remote/fetch_internal_remote_test.go @@ -150,7 +150,7 @@ func TestSuccessfulFetchInternalRemote(t *testing.T) { testhelper.ConfigureGitalyHooksBin(t, remoteCfg) - remoteAddr := testserver.RunGitalyServer(t, remoteCfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + remoteAddr := testserver.RunGitalyServer(t, remoteCfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer( deps.GetCfg(), deps.GetLocator(), @@ -180,7 +180,7 @@ func TestSuccessfulFetchInternalRemote(t *testing.T) { getGitalySSHInvocationParams := listenGitalySSHCalls(t, localCfg) hookManager := &mockHookManager{} - localAddr := testserver.RunGitalyServer(t, localCfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + localAddr := testserver.RunGitalyServer(t, localCfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/remote/testhelper_test.go b/internal/gitaly/service/remote/testhelper_test.go index dac64c19ad3bffbcef49c39af9d36a6a361e4c22..f200e13a664827dea5b8b38af6433053007d73c5 100644 --- a/internal/gitaly/service/remote/testhelper_test.go +++ b/internal/gitaly/service/remote/testhelper_test.go @@ -57,7 +57,7 @@ func setupRemoteServiceWithRuby(t *testing.T, cfg config.Cfg, rubySrv *rubyserve repo, repoPath, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name()) t.Cleanup(cleanup) - addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/remote/update_remote_mirror_test.go b/internal/gitaly/service/remote/update_remote_mirror_test.go index 85b9a46cef5b49340b3a16cb5ddeb858ddb4eabc..7ca63df0dc8656ccfb105f9dd15ef0b0933678e9 100644 --- a/internal/gitaly/service/remote/update_remote_mirror_test.go +++ b/internal/gitaly/service/remote/update_remote_mirror_test.go @@ -503,7 +503,7 @@ func TestUpdateRemoteMirror(t *testing.T) { } } - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { cmdFactory := deps.GetGitCmdFactory() if tc.wrapCommandFactory != nil { cmdFactory = tc.wrapCommandFactory(t, deps.GetGitCmdFactory()) @@ -575,7 +575,7 @@ func TestSuccessfulUpdateRemoteMirrorRequest(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -681,7 +681,7 @@ func TestSuccessfulUpdateRemoteMirrorRequestWithWildcards(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -768,7 +768,7 @@ func TestUpdateRemoteMirrorInmemory(t *testing.T) { cfg := testcfg.Build(t) - serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -819,7 +819,7 @@ func TestSuccessfulUpdateRemoteMirrorRequestWithKeepDivergentRefs(t *testing.T) ctx, cancel := testhelper.Context() defer cancel() - serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -911,7 +911,7 @@ func TestFailedUpdateRemoteMirrorRequestDueToValidation(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/apply_gitattributes_test.go b/internal/gitaly/service/repository/apply_gitattributes_test.go index a19ee3c2b29882686a2da407e2da780766c9d994..a3c093a9ae0c1d6bb32fff5b43f8062a00d1eb3e 100644 --- a/internal/gitaly/service/repository/apply_gitattributes_test.go +++ b/internal/gitaly/service/repository/apply_gitattributes_test.go @@ -90,7 +90,7 @@ func TestApplyGitattributesWithTransaction(t *testing.T) { cfg, repo, repoPath := testcfg.BuildWithRepo(t) transactionServer := &testTransactionServer{} - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/fetch_remote_test.go b/internal/gitaly/service/repository/fetch_remote_test.go index 8dc6751278df473c6dd60ea014337fc1933dbae9..34f3b84756f4e0e8b4ac86c6397356bee4ac0990 100644 --- a/internal/gitaly/service/repository/fetch_remote_test.go +++ b/internal/gitaly/service/repository/fetch_remote_test.go @@ -221,7 +221,7 @@ func TestFetchRemote_transaction(t *testing.T) { sourceCfg, _, sourceRepoPath := testcfg.BuildWithRepo(t) txManager := &mockTxManager{} - addr := testserver.RunGitalyServer(t, sourceCfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, sourceCfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/fork_test.go b/internal/gitaly/service/repository/fork_test.go index 8a387c6615aee790659b17423f2ec521b82e1840..76542080a37ab7a409b4f1869beef697220c3b50 100644 --- a/internal/gitaly/service/repository/fork_test.go +++ b/internal/gitaly/service/repository/fork_test.go @@ -28,6 +28,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ssh" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -217,7 +218,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s registry := backchannel.NewRegistry() locator := config.NewLocator(cfg) cache := cache.New(cfg, locator) - server, err := gserver.New(true, cfg, testhelper.DiscardTestEntry(t), registry, cache) + server, err := gserver.New(true, cfg, testhelper.DiscardTestEntry(t), registry, cache, streamrpc.NewServer()) require.NoError(t, err) listener, addr := testhelper.GetLocalhostListener(t) @@ -239,7 +240,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s // protected by the same TLS certificate. cfg.TLS.KeyPath = "" - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory())) }) diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index a892283936ab403d6e72bd4094802a5ef2500a76..a29096e3cf7f5339f5b436a55afd117049382337 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -392,7 +392,7 @@ func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) { } func runServerWithBadFetchInternalRemote(t *testing.T, cfg config.Cfg) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/testhelper_test.go b/internal/gitaly/service/repository/testhelper_test.go index 62be9ecc4a9ebcffabdf0e2c94dd20e072204607..4f85f87ce631e5bd134f650dc387def48ac3e2fb 100644 --- a/internal/gitaly/service/repository/testhelper_test.go +++ b/internal/gitaly/service/repository/testhelper_test.go @@ -119,7 +119,7 @@ func assertModTimeAfter(t *testing.T, afterTime time.Time, paths ...string) bool } func runRepositoryServerWithConfig(t testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server, opts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( cfg, deps.GetRubyServer(), diff --git a/internal/gitaly/service/server/info_test.go b/internal/gitaly/service/server/info_test.go index e253264cd5467e9441aa1258e8aa217b90ec0586..fa2102d34c784e06f71576c4e6eea454935ae89a 100644 --- a/internal/gitaly/service/server/info_test.go +++ b/internal/gitaly/service/server/info_test.go @@ -57,7 +57,7 @@ func TestGitalyServerInfo(t *testing.T) { } func runServer(t *testing.T, cfg config.Cfg, opts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterServerServiceServer(srv, NewServer(deps.GetGitCmdFactory(), deps.GetCfg().Storages)) }, opts...) } diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 31859d84306f59ac2a5a5a75d149bf2e79459d4d..827906f3a0a89aee920fcb0277cfa52fd48de406 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -21,6 +21,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/server" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/smarthttp" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ssh" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/teststream" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/wiki" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" @@ -51,8 +52,8 @@ var ( ) ) -// RegisterAll will register all the known gRPC services on the provided gRPC service instance. -func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { +// RegisterAll will register all the known gRPC + StreamRPC services +func RegisterAll(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterBlobServiceServer(srv, blob.NewServer( deps.GetCfg(), deps.GetLocator(), @@ -143,6 +144,13 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { gitalypb.RegisterInternalGitalyServer(srv, internalgitaly.NewServer(deps.GetCfg().Storages)) healthpb.RegisterHealthServer(srv, health.NewServer()) - reflection.Register(srv) - grpcprometheus.Register(srv) + + gitalypb.RegisterTestStreamServiceServer(srv, teststream.NewServer( + deps.GetLocator(), + )) + + if gs, ok := srv.(*grpc.Server); ok { + reflection.Register(gs) + grpcprometheus.Register(gs) + } } diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go index 818ad582fc48ac50a64bdde1b517edcf663f5938..4a294235b377c47307595a43aca603dcf572266e 100644 --- a/internal/gitaly/service/smarthttp/receive_pack_test.go +++ b/internal/gitaly/service/smarthttp/receive_pack_test.go @@ -622,7 +622,7 @@ func TestPostReceiveWithReferenceTransactionHook(t *testing.T) { refTransactionServer := &testTransactionServer{} - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSmartHTTPServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go index b2639798539ffd1cba91234f8fce0f08ce6a3b93..2ee3d2ddc7fdeec3499cab9b710d162bf3bf8400 100644 --- a/internal/gitaly/service/smarthttp/testhelper_test.go +++ b/internal/gitaly/service/smarthttp/testhelper_test.go @@ -36,7 +36,7 @@ func testMain(m *testing.M) int { } func startSmartHTTPServer(t *testing.T, cfg config.Cfg, serverOpts ...ServerOpt) testserver.GitalyServer { - return testserver.StartGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.StartGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSmartHTTPServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/ssh/testhelper_test.go b/internal/gitaly/service/ssh/testhelper_test.go index d99c6b2abbabb8c9effc95f5ec7afced06604126..2695c7650366a0265c41d02e38cdf65862b20c56 100644 --- a/internal/gitaly/service/ssh/testhelper_test.go +++ b/internal/gitaly/service/ssh/testhelper_test.go @@ -31,7 +31,7 @@ func runSSHServer(t *testing.T, cfg config.Cfg, serverOpts ...testserver.GitalyS } func runSSHServerWithOptions(t *testing.T, cfg config.Cfg, opts []ServerOpt, serverOpts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSSHServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/teststream/server.go b/internal/gitaly/service/teststream/server.go new file mode 100644 index 0000000000000000000000000000000000000000..51256b2267928e0f34870e166e28caa603249a90 --- /dev/null +++ b/internal/gitaly/service/teststream/server.go @@ -0,0 +1,37 @@ +package teststream + +import ( + "context" + "io" + + "gitlab.com/gitlab-org/gitaly/v14/internal/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +type server struct { + gitalypb.UnimplementedTestStreamServiceServer + locator storage.Locator +} + +func (s *server) TestStream(ctx context.Context, request *gitalypb.TestStreamRequest) (*emptypb.Empty, error) { + if _, err := s.locator.GetRepoPath(request.Repository); err != nil { + return nil, err + } + + c, err := streamrpc.AcceptConnection(ctx) + if err != nil { + return nil, err + } + + _, err = io.CopyN(c, c, request.Size) + return nil, err +} + +// NewServer creates a new instance of a grpc ServerServiceServer +func NewServer(locator storage.Locator) gitalypb.TestStreamServiceServer { + return &server{ + locator: locator, + } +} diff --git a/internal/gitaly/service/teststream/server_test.go b/internal/gitaly/service/teststream/server_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8051cf78758237cadbc5cda4548c44588eb80313 --- /dev/null +++ b/internal/gitaly/service/teststream/server_test.go @@ -0,0 +1,115 @@ +package teststream + +import ( + "bytes" + "io" + "io/ioutil" + "math/rand" + "net" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" +) + +func TestTestStreamPingPong(t *testing.T) { + const size = 1024 * 1024 + + addr, repo := runGitalyServer(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + in := make([]byte, size) + _, err := rand.Read(in) + require.NoError(t, err) + + var out []byte + require.NotEqual(t, in, out) + require.NoError(t, streamrpc.Call( + ctx, + streamrpc.DialNet(addr), + "/gitaly.TestStreamService/TestStream", + &gitalypb.TestStreamRequest{ + Repository: repo, + Size: size, + }, + func(c net.Conn) error { + errC := make(chan error, 1) + go func() { + var err error + out, err = ioutil.ReadAll(c) + errC <- err + }() + + if _, err := io.Copy(c, bytes.NewReader(in)); err != nil { + return err + } + if err := <-errC; err != nil { + return err + } + + return nil + }, + )) + + require.Equal(t, in, out, "byte stream works") +} + +func TestTestStreamPingPongWithInvalidRepo(t *testing.T) { + addr, repo := runGitalyServer(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + err := streamrpc.Call( + ctx, + streamrpc.DialNet(addr), + "/gitaly.TestStreamService/TestStream", + &gitalypb.TestStreamRequest{ + Repository: &gitalypb.Repository{ + StorageName: repo.StorageName, + RelativePath: "@hashed/94/00/notexist.git", + GlRepository: repo.GlRepository, + GlProjectPath: repo.GlProjectPath, + }, + Size: 1024 * 1024, + }, + func(c net.Conn) error { + panic("Should not reach here") + }, + ) + + require.Error(t, err) + require.Contains( + t, err.Error(), + "rpc error: code = NotFound desc = GetRepoPath: not a git repository", + ) +} + +func runGitalyServer(t *testing.T) (string, *gitalypb.Repository) { + t.Helper() + testhelper.Configure() + + cfg, repo, _ := testcfg.BuildWithRepo(t) + + addr := testserver.RunGitalyServer( + t, cfg, nil, + func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { + gitalypb.RegisterTestStreamServiceServer(srv, NewServer(deps.GetLocator())) + }, + // TODO: At the moment, stream RPC doesn't work well with Praefect, + // hence we have to disable Praefect. We can remove this option after + // https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1127 is + // done + testserver.WithDisablePraefect(), + ) + + return addr, repo +} diff --git a/internal/gitaly/service/wiki/testhelper_test.go b/internal/gitaly/service/wiki/testhelper_test.go index 85d2d1cc40c8c8b6c6f82d35fc0f416412996ee7..61ccfd3d1b8659f62a400323eeffbd2ba7978bdb 100644 --- a/internal/gitaly/service/wiki/testhelper_test.go +++ b/internal/gitaly/service/wiki/testhelper_test.go @@ -85,7 +85,7 @@ func TestWithRubySidecar(t *testing.T) { } func setupWikiService(t testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server) gitalypb.WikiServiceClient { - addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterWikiServiceServer(srv, NewServer(deps.GetRubyServer(), deps.GetLocator())) }) client := newWikiClient(t, addr) diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go index 028df3d20afce272054bbf6b273ed652f58d6ca4..6878a070838411e5e62775f2ae65f0c929909a31 100644 --- a/internal/gitaly/transaction/manager_test.go +++ b/internal/gitaly/transaction/manager_test.go @@ -191,7 +191,7 @@ func TestPoolManager_Stop(t *testing.T) { func runTransactionServer(t *testing.T, cfg config.Cfg) (*testTransactionServer, string) { transactionServer := &testTransactionServer{} cfg.ListenAddr = ":0" // pushes gRPC to listen on the TCP address - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRefTransactionServer(srv, transactionServer) }, testserver.WithDisablePraefect()) return transactionServer, addr diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 811f40c87da1d23a722214565abae91bdc984943..8d0814afe7def18ca653d6cc541859871ad2cbd0 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1451,7 +1451,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { wg: &wg, } - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, operationServer) }, testserver.WithDiskCache(&mockDiskCache{})) diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index 4fd1a69fbbb7ebb2a3494989a4e99d51632293f6..8e00abd6fd03ef347885fa0a24091079d1bfc7d9 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -31,7 +31,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { testRepo = repo } cfgs = append(cfgs, cfg) - cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv *grpc.Server, deps *service.Dependencies) { + cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b5675f8022da0280e8579920e381cc82aed1c8e2..a4fc45e8530594d1830a4f2b620611d003a4d6f4 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -570,7 +570,7 @@ func (m *mockServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) func runMockRepositoryServer(t *testing.T, cfg gconfig.Cfg) (*mockServer, string) { mockServer := newMockRepositoryServer() - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, mockServer) gitalypb.RegisterRefServiceServer(srv, mockServer) }) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 71812100e74e7da43bdd64a5f8366993b9a8d133..22da58a6b8ee37b602aac2c1d79a417d49adfffd 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -724,7 +724,7 @@ func (m *mockSmartHTTP) Called(method string) int { } func newSmartHTTPGrpcServer(t *testing.T, cfg gconfig.Cfg, smartHTTPService gitalypb.SmartHTTPServiceServer) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSmartHTTPServiceServer(srv, smartHTTPService) }, testserver.WithDisablePraefect()) } diff --git a/internal/streamrpc/handshaker.go b/internal/streamrpc/handshaker.go new file mode 100644 index 0000000000000000000000000000000000000000..54548af86d8efac453732bee0f01e14959309796 --- /dev/null +++ b/internal/streamrpc/handshaker.go @@ -0,0 +1,137 @@ +package streamrpc + +import ( + "crypto/tls" + "fmt" + "net" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter" + "google.golang.org/grpc/credentials" +) + +// The magic bytes used for classification by listenmux +var magicBytes = []byte("streamrpc00") + +// DialNet lets Call initiate unencrypted connections. They tend to be used +// with Gitaly's listenmux multiplexer only. After the connection is +// established, streamrpc's 11-byte magic bytes are written into the wire. +// Listemmux peeks into these magic bytes and redirects the request to +// StreamRPC server. +// Please visit internal/listenmux/mux.go for more information +func DialNet(address string) DialFunc { + return func(t time.Duration) (net.Conn, error) { + endpoint, err := starter.ParseEndpoint(address) + if err != nil { + return nil, err + } + + // Dial-only deadline + deadline := time.Now().Add(t) + + dialer := &net.Dialer{Deadline: deadline} + conn, err := dialer.Dial(endpoint.Name, endpoint.Addr) + if err != nil { + return nil, err + } + + if err = conn.SetDeadline(deadline); err != nil { + return nil, err + } + // Write the magic bytes on the connection so the server knows we're + // about to initiate a multiplexing session. + if _, err := conn.Write(magicBytes); err != nil { + return nil, fmt.Errorf("streamrpc client: write backchannel magic bytes: %w", err) + } + + // Reset deadline of tls connection for later stages + if err = conn.SetDeadline(time.Time{}); err != nil { + return nil, err + } + + return conn, nil + } +} + +// DialTLS lets Call initiate TLS connections. Similar to DialNet, the +// connections are used for listenmux multiplexer. There are 3 steps involving: +// - TCP handshake +// - TLS handshake +// - Write streamrpc magic bytes +func DialTLS(address string, cfg *tls.Config) DialFunc { + return func(t time.Duration) (net.Conn, error) { + // Dial-only deadline + deadline := time.Now().Add(t) + + dialer := &net.Dialer{Deadline: deadline} + tlsConn, err := tls.DialWithDialer(dialer, "tcp", address, cfg) + if err != nil { + return nil, err + } + + err = tlsConn.SetDeadline(deadline) + if err != nil { + return nil, err + } + // Write the magic bytes on the connection so the server knows we're + // about to initiate a multiplexing session. + if _, err := tlsConn.Write(magicBytes); err != nil { + return nil, fmt.Errorf("streamrpc client: write backchannel magic bytes: %w", err) + } + + // Reset deadline of tls connection for later stages + if err = tlsConn.SetDeadline(time.Time{}); err != nil { + return nil, err + } + + return tlsConn, nil + } +} + +// ServerHandshaker implements the server side handshake of the multiplexed connection. +type ServerHandshaker struct { + server *Server + logger logrus.FieldLogger +} + +// NewServerHandshaker returns an implementation of streamrpc server +// handshaker. The provided TransportCredentials are handshaked prior to +// initializing the multiplexing session. This handshaker Gitaly's unary server +// interceptors into the interceptor chain of input StreamRPC server. +func NewServerHandshaker(server *Server, logger logrus.FieldLogger) *ServerHandshaker { + return &ServerHandshaker{ + server: server, + logger: logger, + } +} + +// Magic is used by listenmux to retrieve the magic string for +// streamrpc connections. +func (s *ServerHandshaker) Magic() string { return string(magicBytes) } + +// Handshake "steals" the request from Gitaly's main gRPC server during +// connection handshaking phase. Listenmux depends on the first 11-byte magic +// bytes sent by the client, and invoke StreamRPC handshaker accordingly. The +// request is then handled by stream RPC server, and skipped by Gitaly gRPC +// server. +func (s *ServerHandshaker) Handshake(conn net.Conn, authInfo credentials.AuthInfo) (net.Conn, credentials.AuthInfo, error) { + if err := conn.SetDeadline(time.Time{}); err != nil { + return nil, nil, err + } + + go func() { + if err := s.server.Handle(conn); err != nil { + s.logger.WithError(err).Error("streamrpc: handle call") + } + }() + // At this point, the connection is already closed. If the + // TransportCredentials continues its code path, gRPC constructs a HTTP2 + // server transport to handle the connection. Eventually, it fails and logs + // several warnings and errors even though the stream RPC call is + // successful. + // Fortunately, gRPC has credentials.ErrConnDispatched, indicating that the + // connection is already dispatched out of gRPC. gRPC should leave it alone + // and exit in peace. + return nil, nil, credentials.ErrConnDispatched +} diff --git a/internal/streamrpc/rpc_test.go b/internal/streamrpc/rpc_test.go index 850838465223558a2d1601122ceb255fcd1cba1b..c9344803662c6d644b5a037e2041d1f7268f2a02 100644 --- a/internal/streamrpc/rpc_test.go +++ b/internal/streamrpc/rpc_test.go @@ -185,18 +185,20 @@ func TestCall_serverMiddleware(t *testing.T) { ) interceptorDone := make(chan struct{}) + server := NewServer() + server.UseInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + defer close(interceptorDone) + middlewareMethod = info.FullMethod + receivedField = req.(*testpb.StreamRequest).StringField + if md, ok := metadata.FromIncomingContext(ctx); ok { + receivedValues = md[testKey] + } + return handler(ctx, req) + }) dial := startServer( t, - NewServer(WithServerInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - defer close(interceptorDone) - middlewareMethod = info.FullMethod - receivedField = req.(*testpb.StreamRequest).StringField - if md, ok := metadata.FromIncomingContext(ctx); ok { - receivedValues = md[testKey] - } - return handler(ctx, req) - })), + server, func(ctx context.Context, in *testpb.StreamRequest) (*emptypb.Empty, error) { _, err := AcceptConnection(ctx) return nil, err @@ -219,15 +221,11 @@ func TestCall_serverMiddleware(t *testing.T) { } func TestCall_serverMiddlewareReject(t *testing.T) { - dial := startServer( - t, - NewServer(WithServerInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - return nil, errors.New("middleware says no") - })), - func(ctx context.Context, in *testpb.StreamRequest) (*emptypb.Empty, error) { - panic("never reached") - }, - ) + server := NewServer() + server.UseInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return nil, errors.New("middleware says no") + }) + dial := startServer(t, server, func(ctx context.Context, in *testpb.StreamRequest) (*emptypb.Empty, error) { panic("never reached") }) err := Call( context.Background(), diff --git a/internal/streamrpc/server.go b/internal/streamrpc/server.go index 502764fd47ff5a36add567356906d7b024b1bc38..2c4771af5fb20768a262a37befd9bdefcd6e6682 100644 --- a/internal/streamrpc/server.go +++ b/internal/streamrpc/server.go @@ -30,11 +30,6 @@ type method struct { // options to NewServer. type ServerOption func(*Server) -// WithServerInterceptor adds a unary gRPC server interceptor. -func WithServerInterceptor(interceptor grpc.UnaryServerInterceptor) ServerOption { - return func(s *Server) { s.interceptor = interceptor } -} - // NewServer returns a new StreamRPC server. You can pass the result to // grpc-go RegisterFooServer functions. func NewServer(opts ...ServerOption) *Server { @@ -60,6 +55,12 @@ func (s *Server) RegisterService(sd *grpc.ServiceDesc, impl interface{}) { } } +// UseInterceptor adds a unary gRPC server interceptor for the StreamRPC +// server to use. +func (s *Server) UseInterceptor(interceptor grpc.UnaryServerInterceptor) { + s.interceptor = interceptor +} + // Handle handles an incoming network connection with the StreamRPC // protocol. It is intended to be called from a net.Listener.Accept loop // (or something equivalent). diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index df54fa39c5d6b1c14506bcd673e6cd15c1ad054c..399aed5371ae96c1f85720a50dd89e7906617d9c 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -41,7 +41,7 @@ import ( // It accepts addition Registrar to register all required service instead of // calling service.RegisterAll explicitly because it creates a circular dependency // when the function is used in on of internal/gitaly/service/... packages. -func RunGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) string { +func RunGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv grpc.ServiceRegistrar, deps *service.Dependencies), opts ...GitalyServerOpt) string { _, gitalyAddr, disablePraefect := runGitaly(t, cfg, rubyServer, registrar, opts...) praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN") @@ -135,7 +135,7 @@ func (gs GitalyServer) Address() string { } // StartGitalyServer creates and runs gitaly (and praefect as a proxy) server. -func StartGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) GitalyServer { +func StartGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv grpc.ServiceRegistrar, deps *service.Dependencies), opts ...GitalyServerOpt) GitalyServer { gitalySrv, gitalyAddr, disablePraefect := runGitaly(t, cfg, rubyServer, registrar, opts...) praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN") @@ -199,7 +199,7 @@ func IsHealthy(conn *grpc.ClientConn, timeout time.Duration) bool { return true } -func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) { +func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv grpc.ServiceRegistrar, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) { t.Helper() var gsd gitalyServerDeps @@ -210,20 +210,21 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi deps := gsd.createDependencies(t, cfg, rubyServer) t.Cleanup(func() { gsd.conns.Close() }) - srv, err := server.NewGitalyServerFactory( + grpcSrv, srpcSrv, err := server.NewGitalyServerFactory( cfg, gsd.logger.WithField("test", t.Name()), deps.GetBackchannelRegistry(), deps.GetDiskCache(), ).CreateExternal(cfg.TLS.CertPath != "" && cfg.TLS.KeyPath != "") require.NoError(t, err) - t.Cleanup(srv.Stop) + t.Cleanup(grpcSrv.Stop) - registrar(srv, deps) - if _, found := srv.GetServiceInfo()["grpc.health.v1.Health"]; !found { + registrar(grpcSrv, deps) + registrar(srpcSrv, deps) + if _, found := grpcSrv.GetServiceInfo()["grpc.health.v1.Health"]; !found { // we should register health service as it is used for the health checks // praefect service executes periodically (and on the bootstrap step) - healthpb.RegisterHealthServer(srv, health.NewServer()) + healthpb.RegisterHealthServer(grpcSrv, health.NewServer()) } // listen on internal socket @@ -243,7 +244,7 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi internalListener, err := net.Listen("unix", cfg.GitalyInternalSocketPath()) require.NoError(t, err) - go srv.Serve(internalListener) + go grpcSrv.Serve(internalListener) } var listener net.Listener @@ -266,9 +267,9 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi addr = "unix://" + serverSocketPath } - go srv.Serve(listener) + go grpcSrv.Serve(listener) - return srv, addr, gsd.disablePraefect + return grpcSrv, addr, gsd.disablePraefect } type gitalyServerDeps struct { diff --git a/proto/go/gitalypb/protolist.go b/proto/go/gitalypb/protolist.go index a15916f7044c5f2a915ae137ecd7dbeec0dfc489..9d26e24a784506a9b84de3bcb03c1f7bffe9c541 100644 --- a/proto/go/gitalypb/protolist.go +++ b/proto/go/gitalypb/protolist.go @@ -23,6 +23,7 @@ var GitalyProtos = []string{ "shared.proto", "smarthttp.proto", "ssh.proto", + "teststream.proto", "transaction.proto", "wiki.proto", } diff --git a/proto/go/gitalypb/teststream.pb.go b/proto/go/gitalypb/teststream.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..d6a68abce3e6c3e0e313adc78b32324b9ca43423 --- /dev/null +++ b/proto/go/gitalypb/teststream.pb.go @@ -0,0 +1,173 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.17.3 +// source: teststream.proto + +package gitalypb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type TestStreamRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"` + Size int64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"` +} + +func (x *TestStreamRequest) Reset() { + *x = TestStreamRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_teststream_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestStreamRequest) ProtoMessage() {} + +func (x *TestStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_teststream_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestStreamRequest.ProtoReflect.Descriptor instead. +func (*TestStreamRequest) Descriptor() ([]byte, []int) { + return file_teststream_proto_rawDescGZIP(), []int{0} +} + +func (x *TestStreamRequest) GetRepository() *Repository { + if x != nil { + return x.Repository + } + return nil +} + +func (x *TestStreamRequest) GetSize() int64 { + if x != nil { + return x.Size + } + return 0 +} + +var File_teststream_proto protoreflect.FileDescriptor + +var file_teststream_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x74, 0x65, 0x73, 0x74, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x1a, 0x0a, 0x6c, 0x69, 0x6e, 0x74, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0x61, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, + 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, + 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, + 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, + 0x73, 0x69, 0x7a, 0x65, 0x32, 0x5c, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x47, 0x0a, 0x0a, 0x54, 0x65, 0x73, + 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, + 0x08, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_teststream_proto_rawDescOnce sync.Once + file_teststream_proto_rawDescData = file_teststream_proto_rawDesc +) + +func file_teststream_proto_rawDescGZIP() []byte { + file_teststream_proto_rawDescOnce.Do(func() { + file_teststream_proto_rawDescData = protoimpl.X.CompressGZIP(file_teststream_proto_rawDescData) + }) + return file_teststream_proto_rawDescData +} + +var file_teststream_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_teststream_proto_goTypes = []interface{}{ + (*TestStreamRequest)(nil), // 0: gitaly.TestStreamRequest + (*Repository)(nil), // 1: gitaly.Repository + (*emptypb.Empty)(nil), // 2: google.protobuf.Empty +} +var file_teststream_proto_depIdxs = []int32{ + 1, // 0: gitaly.TestStreamRequest.repository:type_name -> gitaly.Repository + 0, // 1: gitaly.TestStreamService.TestStream:input_type -> gitaly.TestStreamRequest + 2, // 2: gitaly.TestStreamService.TestStream:output_type -> google.protobuf.Empty + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_teststream_proto_init() } +func file_teststream_proto_init() { + if File_teststream_proto != nil { + return + } + file_lint_proto_init() + file_shared_proto_init() + if !protoimpl.UnsafeEnabled { + file_teststream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestStreamRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_teststream_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_teststream_proto_goTypes, + DependencyIndexes: file_teststream_proto_depIdxs, + MessageInfos: file_teststream_proto_msgTypes, + }.Build() + File_teststream_proto = out.File + file_teststream_proto_rawDesc = nil + file_teststream_proto_goTypes = nil + file_teststream_proto_depIdxs = nil +} diff --git a/proto/go/gitalypb/teststream_grpc.pb.go b/proto/go/gitalypb/teststream_grpc.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..e8b4c5fcb68c1776b26faeb7b279392e980c7c2a --- /dev/null +++ b/proto/go/gitalypb/teststream_grpc.pb.go @@ -0,0 +1,102 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package gitalypb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// TestStreamServiceClient is the client API for TestStreamService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TestStreamServiceClient interface { + TestStream(ctx context.Context, in *TestStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) +} + +type testStreamServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewTestStreamServiceClient(cc grpc.ClientConnInterface) TestStreamServiceClient { + return &testStreamServiceClient{cc} +} + +func (c *testStreamServiceClient) TestStream(ctx context.Context, in *TestStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/gitaly.TestStreamService/TestStream", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TestStreamServiceServer is the server API for TestStreamService service. +// All implementations must embed UnimplementedTestStreamServiceServer +// for forward compatibility +type TestStreamServiceServer interface { + TestStream(context.Context, *TestStreamRequest) (*emptypb.Empty, error) + mustEmbedUnimplementedTestStreamServiceServer() +} + +// UnimplementedTestStreamServiceServer must be embedded to have forward compatible implementations. +type UnimplementedTestStreamServiceServer struct { +} + +func (UnimplementedTestStreamServiceServer) TestStream(context.Context, *TestStreamRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TestStream not implemented") +} +func (UnimplementedTestStreamServiceServer) mustEmbedUnimplementedTestStreamServiceServer() {} + +// UnsafeTestStreamServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TestStreamServiceServer will +// result in compilation errors. +type UnsafeTestStreamServiceServer interface { + mustEmbedUnimplementedTestStreamServiceServer() +} + +func RegisterTestStreamServiceServer(s grpc.ServiceRegistrar, srv TestStreamServiceServer) { + s.RegisterService(&TestStreamService_ServiceDesc, srv) +} + +func _TestStreamService_TestStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TestStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TestStreamServiceServer).TestStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gitaly.TestStreamService/TestStream", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TestStreamServiceServer).TestStream(ctx, req.(*TestStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// TestStreamService_ServiceDesc is the grpc.ServiceDesc for TestStreamService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TestStreamService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "gitaly.TestStreamService", + HandlerType: (*TestStreamServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "TestStream", + Handler: _TestStreamService_TestStream_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "teststream.proto", +} diff --git a/proto/teststream.proto b/proto/teststream.proto new file mode 100644 index 0000000000000000000000000000000000000000..734047887f708a247d6301041ea76addcc2d8a26 --- /dev/null +++ b/proto/teststream.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package gitaly; + +option go_package = "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"; + +import "lint.proto"; +import "shared.proto"; +import "google/protobuf/empty.proto"; + +service TestStreamService { + rpc TestStream(TestStreamRequest) returns (google.protobuf.Empty) { + option (op_type) = { + op: ACCESSOR + }; + } +} + +message TestStreamRequest { + Repository repository = 1 [(target_repository)=true]; + int64 size = 2; +} diff --git a/ruby/proto/gitaly.rb b/ruby/proto/gitaly.rb index 9c80cea63b406246751f0c22dcf0d0c007d4a7bc..9cfff81cf0f07d90af86272847a82b6d2279196d 100644 --- a/ruby/proto/gitaly.rb +++ b/ruby/proto/gitaly.rb @@ -37,6 +37,8 @@ require 'gitaly/smarthttp_services_pb' require 'gitaly/ssh_services_pb' +require 'gitaly/teststream_services_pb' + require 'gitaly/transaction_services_pb' require 'gitaly/wiki_services_pb' diff --git a/ruby/proto/gitaly/teststream_pb.rb b/ruby/proto/gitaly/teststream_pb.rb new file mode 100644 index 0000000000000000000000000000000000000000..d75050f048f6dec7fef0cf5d57e330b17a11b990 --- /dev/null +++ b/ruby/proto/gitaly/teststream_pb.rb @@ -0,0 +1,20 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: teststream.proto + +require 'google/protobuf' + +require 'lint_pb' +require 'shared_pb' +require 'google/protobuf/empty_pb' +Google::Protobuf::DescriptorPool.generated_pool.build do + add_file("teststream.proto", :syntax => :proto3) do + add_message "gitaly.TestStreamRequest" do + optional :repository, :message, 1, "gitaly.Repository" + optional :size, :int64, 2 + end + end +end + +module Gitaly + TestStreamRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.TestStreamRequest").msgclass +end diff --git a/ruby/proto/gitaly/teststream_services_pb.rb b/ruby/proto/gitaly/teststream_services_pb.rb new file mode 100644 index 0000000000000000000000000000000000000000..c74774b038f8c7be0d67c0c7520ec9957d87881e --- /dev/null +++ b/ruby/proto/gitaly/teststream_services_pb.rb @@ -0,0 +1,22 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: teststream.proto for package 'gitaly' + +require 'grpc' +require 'teststream_pb' + +module Gitaly + module TestStreamService + class Service + + include ::GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'gitaly.TestStreamService' + + rpc :TestStream, ::Gitaly::TestStreamRequest, ::Google::Protobuf::Empty + end + + Stub = Service.rpc_stub_class + end +end