From 388a390534066e6d6f410f878a09c3201f8c17b5 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Tue, 27 Jul 2021 14:40:36 +0000 Subject: [PATCH 1/7] Use grpc.ServiceRegistrar in setup.RegisterAll --- cmd/gitaly-hooks/hooks_test.go | 2 +- internal/git/remoterepo/repository_test.go | 2 +- internal/git/updateref/update_with_hooks_test.go | 2 +- internal/gitaly/service/blob/testhelper_test.go | 2 +- internal/gitaly/service/cleanup/testhelper_test.go | 2 +- internal/gitaly/service/commit/testhelper_test.go | 2 +- internal/gitaly/service/conflicts/testhelper_test.go | 2 +- internal/gitaly/service/diff/testhelper_test.go | 2 +- internal/gitaly/service/hook/testhelper_test.go | 2 +- .../gitaly/service/internalgitaly/testhelper_test.go | 2 +- internal/gitaly/service/namespace/testhelper_test.go | 2 +- .../gitaly/service/objectpool/testhelper_test.go | 2 +- internal/gitaly/service/operations/branches_test.go | 4 ++-- internal/gitaly/service/operations/tags_test.go | 2 +- .../gitaly/service/operations/testhelper_test.go | 2 +- internal/gitaly/service/ref/delete_refs_test.go | 2 +- internal/gitaly/service/ref/testhelper_test.go | 2 +- .../service/remote/fetch_internal_remote_test.go | 4 ++-- internal/gitaly/service/remote/testhelper_test.go | 2 +- .../service/remote/update_remote_mirror_test.go | 12 ++++++------ .../service/repository/apply_gitattributes_test.go | 2 +- .../gitaly/service/repository/fetch_remote_test.go | 2 +- internal/gitaly/service/repository/fork_test.go | 2 +- internal/gitaly/service/repository/replicate_test.go | 2 +- .../gitaly/service/repository/testhelper_test.go | 2 +- internal/gitaly/service/server/info_test.go | 2 +- internal/gitaly/service/setup/register.go | 9 ++++++--- .../gitaly/service/smarthttp/receive_pack_test.go | 2 +- internal/gitaly/service/smarthttp/testhelper_test.go | 2 +- internal/gitaly/service/ssh/testhelper_test.go | 2 +- internal/gitaly/service/wiki/testhelper_test.go | 2 +- internal/gitaly/transaction/manager_test.go | 2 +- internal/praefect/coordinator_test.go | 2 +- internal/praefect/info_service_test.go | 2 +- internal/praefect/replicator_test.go | 2 +- internal/praefect/server_test.go | 2 +- internal/testhelper/testserver/gitaly.go | 6 +++--- 37 files changed, 51 insertions(+), 48 deletions(-) diff --git a/cmd/gitaly-hooks/hooks_test.go b/cmd/gitaly-hooks/hooks_test.go index d0f219bc8b7..424dd60a3e2 100644 --- a/cmd/gitaly-hooks/hooks_test.go +++ b/cmd/gitaly-hooks/hooks_test.go @@ -608,7 +608,7 @@ func (svc featureFlagAsserter) PackObjectsHook(stream gitalypb.HookService_PackO } func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client) { - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, featureFlagAsserter{ t: t, wrapped: hook.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory()), }) diff --git a/internal/git/remoterepo/repository_test.go b/internal/git/remoterepo/repository_test.go index b74f88f7581..aff19797da3 100644 --- a/internal/git/remoterepo/repository_test.go +++ b/internal/git/remoterepo/repository_test.go @@ -22,7 +22,7 @@ import ( func TestRepository(t *testing.T) { cfg := testcfg.Build(t) - serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/git/updateref/update_with_hooks_test.go b/internal/git/updateref/update_with_hooks_test.go index f5740c70c82..da0e40dd26d 100644 --- a/internal/git/updateref/update_with_hooks_test.go +++ b/internal/git/updateref/update_with_hooks_test.go @@ -97,7 +97,7 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { // We need to set up a separate "real" hook service here, as it will be used in // git-update-ref(1) spawned by `updateRefWithHooks()` - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory())) }) diff --git a/internal/gitaly/service/blob/testhelper_test.go b/internal/gitaly/service/blob/testhelper_test.go index e7b64bd503b..a0de32432e9 100644 --- a/internal/gitaly/service/blob/testhelper_test.go +++ b/internal/gitaly/service/blob/testhelper_test.go @@ -34,7 +34,7 @@ func setup(t *testing.T) (config.Cfg, *gitalypb.Repository, string, gitalypb.Blo repo, repoPath, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name()) t.Cleanup(cleanup) - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterBlobServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/cleanup/testhelper_test.go b/internal/gitaly/service/cleanup/testhelper_test.go index d16a8bd4d64..8dc286effee 100644 --- a/internal/gitaly/service/cleanup/testhelper_test.go +++ b/internal/gitaly/service/cleanup/testhelper_test.go @@ -37,7 +37,7 @@ func setupCleanupService(t *testing.T) (config.Cfg, *gitalypb.Repository, string } func runCleanupServiceServer(t *testing.T, cfg config.Cfg) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterCleanupServiceServer(srv, NewServer( deps.GetCfg(), deps.GetGitCmdFactory(), diff --git a/internal/gitaly/service/commit/testhelper_test.go b/internal/gitaly/service/commit/testhelper_test.go index 28eef0f7876..a28fb70ff12 100644 --- a/internal/gitaly/service/commit/testhelper_test.go +++ b/internal/gitaly/service/commit/testhelper_test.go @@ -66,7 +66,7 @@ func setupCommitServiceCreateRepo( func startTestServices(t testing.TB, cfg config.Cfg) string { t.Helper() - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterCommitServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/conflicts/testhelper_test.go b/internal/gitaly/service/conflicts/testhelper_test.go index c8f14d61793..9b55622b243 100644 --- a/internal/gitaly/service/conflicts/testhelper_test.go +++ b/internal/gitaly/service/conflicts/testhelper_test.go @@ -84,7 +84,7 @@ func SetupConflictsService(t testing.TB, bare bool, hookManager hook.Manager) (c } func runConflictsServer(t testing.TB, cfg config.Cfg, hookManager hook.Manager) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterConflictsServiceServer(srv, NewServer( deps.GetCfg(), deps.GetHookManager(), diff --git a/internal/gitaly/service/diff/testhelper_test.go b/internal/gitaly/service/diff/testhelper_test.go index 1029cd3ed99..c787d35fc25 100644 --- a/internal/gitaly/service/diff/testhelper_test.go +++ b/internal/gitaly/service/diff/testhelper_test.go @@ -27,7 +27,7 @@ func testMain(m *testing.M) int { func setupDiffService(t testing.TB, opt ...testserver.GitalyServerOpt) (config.Cfg, *gitalypb.Repository, string, gitalypb.DiffServiceClient) { cfg, repo, repoPath := testcfg.BuildWithRepo(t) - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterDiffServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/hook/testhelper_test.go b/internal/gitaly/service/hook/testhelper_test.go index dcbdd56e207..cd2630bfd35 100644 --- a/internal/gitaly/service/hook/testhelper_test.go +++ b/internal/gitaly/service/hook/testhelper_test.go @@ -55,7 +55,7 @@ type serverOption func(*server) func runHooksServer(t testing.TB, cfg config.Cfg, opts []serverOption, serverOpts ...testserver.GitalyServerOpt) string { t.Helper() - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { hookServer := NewServer( deps.GetCfg(), gitalyhook.NewManager(deps.GetLocator(), deps.GetTxManager(), deps.GetGitlabClient(), deps.GetCfg()), diff --git a/internal/gitaly/service/internalgitaly/testhelper_test.go b/internal/gitaly/service/internalgitaly/testhelper_test.go index d768d2d80a9..f092df9313a 100644 --- a/internal/gitaly/service/internalgitaly/testhelper_test.go +++ b/internal/gitaly/service/internalgitaly/testhelper_test.go @@ -25,7 +25,7 @@ func testMain(m *testing.M) int { } func setupInternalGitalyService(t *testing.T, cfg config.Cfg, internalService gitalypb.InternalGitalyServer) gitalypb.InternalGitalyClient { - add := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + add := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterInternalGitalyServer(srv, internalService) }, testserver.WithDisablePraefect()) conn, err := grpc.Dial(add, grpc.WithInsecure()) diff --git a/internal/gitaly/service/namespace/testhelper_test.go b/internal/gitaly/service/namespace/testhelper_test.go index a37b5a03adc..a3851a41aa0 100644 --- a/internal/gitaly/service/namespace/testhelper_test.go +++ b/internal/gitaly/service/namespace/testhelper_test.go @@ -17,7 +17,7 @@ func setupNamespaceService(t testing.TB, opts ...testserver.GitalyServerOpt) (co cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "other")) cfg := cfgBuilder.Build(t) - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterNamespaceServiceServer(srv, NewServer(deps.GetLocator())) }, opts...) diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go index 7777a638c3b..f1b70c22046 100644 --- a/internal/gitaly/service/objectpool/testhelper_test.go +++ b/internal/gitaly/service/objectpool/testhelper_test.go @@ -46,7 +46,7 @@ func setup(t *testing.T, opts ...testserver.GitalyServerOpt) (config.Cfg, *gital } func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator, logger *logrus.Logger, opts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterObjectPoolServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index 3f082d51737..ceea9984281 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -122,7 +122,7 @@ func TestUserCreateBranchWithTransaction(t *testing.T) { transactionServer := &testTransactionServer{} cfg.ListenAddr = "127.0.0.1:0" // runs gitaly on the TCP address - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), nil, @@ -483,7 +483,7 @@ func TestUserDeleteBranch_transaction(t *testing.T) { transactionServer := &testTransactionServer{} - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), nil, diff --git a/internal/gitaly/service/operations/tags_test.go b/internal/gitaly/service/operations/tags_test.go index 6b8adc277d3..3cc6b55473c 100644 --- a/internal/gitaly/service/operations/tags_test.go +++ b/internal/gitaly/service/operations/tags_test.go @@ -275,7 +275,7 @@ func TestUserCreateTagWithTransaction(t *testing.T) { // runOperationServiceServer as it puts a Praefect server in between if // running Praefect tests, which would break our test setup. transactionServer := &testTransactionServer{} - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), nil, diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go index cda7418f360..36fcb8124c2 100644 --- a/internal/gitaly/service/operations/testhelper_test.go +++ b/internal/gitaly/service/operations/testhelper_test.go @@ -103,7 +103,7 @@ func setupOperationsServiceWithRuby( func runOperationServiceServer(t testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server, options ...testserver.GitalyServerOpt) string { t.Helper() - return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/ref/delete_refs_test.go b/internal/gitaly/service/ref/delete_refs_test.go index ba28bf018a8..677e81d3e52 100644 --- a/internal/gitaly/service/ref/delete_refs_test.go +++ b/internal/gitaly/service/ref/delete_refs_test.go @@ -91,7 +91,7 @@ func TestDeleteRefs_transaction(t *testing.T) { }, } - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRefServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/ref/testhelper_test.go b/internal/gitaly/service/ref/testhelper_test.go index 55683f0957c..eda08dc8bf3 100644 --- a/internal/gitaly/service/ref/testhelper_test.go +++ b/internal/gitaly/service/ref/testhelper_test.go @@ -70,7 +70,7 @@ func setupRefServiceWithoutRepo(t testing.TB) (config.Cfg, gitalypb.RefServiceCl } func runRefServiceServer(t testing.TB, cfg config.Cfg) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRefServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/remote/fetch_internal_remote_test.go b/internal/gitaly/service/remote/fetch_internal_remote_test.go index 35ec8e25c7a..f142b6deccf 100644 --- a/internal/gitaly/service/remote/fetch_internal_remote_test.go +++ b/internal/gitaly/service/remote/fetch_internal_remote_test.go @@ -150,7 +150,7 @@ func TestSuccessfulFetchInternalRemote(t *testing.T) { testhelper.ConfigureGitalyHooksBin(t, remoteCfg) - remoteAddr := testserver.RunGitalyServer(t, remoteCfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + remoteAddr := testserver.RunGitalyServer(t, remoteCfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer( deps.GetCfg(), deps.GetLocator(), @@ -180,7 +180,7 @@ func TestSuccessfulFetchInternalRemote(t *testing.T) { getGitalySSHInvocationParams := listenGitalySSHCalls(t, localCfg) hookManager := &mockHookManager{} - localAddr := testserver.RunGitalyServer(t, localCfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + localAddr := testserver.RunGitalyServer(t, localCfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/remote/testhelper_test.go b/internal/gitaly/service/remote/testhelper_test.go index e7d3d026286..8962ebb4074 100644 --- a/internal/gitaly/service/remote/testhelper_test.go +++ b/internal/gitaly/service/remote/testhelper_test.go @@ -63,7 +63,7 @@ func setupRemoteServiceWithRuby(t *testing.T, cfg config.Cfg, rubySrv *rubyserve repo, repoPath, cleanup := gittest.CloneRepoAtStorage(t, cfg, cfg.Storages[0], t.Name()) t.Cleanup(cleanup) - addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/remote/update_remote_mirror_test.go b/internal/gitaly/service/remote/update_remote_mirror_test.go index b29a8cb8095..17a19eb7069 100644 --- a/internal/gitaly/service/remote/update_remote_mirror_test.go +++ b/internal/gitaly/service/remote/update_remote_mirror_test.go @@ -494,7 +494,7 @@ func testUpdateRemoteMirrorFeatured(t *testing.T, ctx context.Context, cfg confi } } - addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { cmdFactory := deps.GetGitCmdFactory() if tc.wrapCommandFactory != nil { cmdFactory = tc.wrapCommandFactory(t, deps.GetGitCmdFactory()) @@ -567,7 +567,7 @@ func testSuccessfulUpdateRemoteMirrorRequest(t *testing.T, cfg config.Cfg, rubyS } func testSuccessfulUpdateRemoteMirrorRequestFeatured(t *testing.T, ctx context.Context, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -674,7 +674,7 @@ func testSuccessfulUpdateRemoteMirrorRequestWithWildcards(t *testing.T, cfg conf } func testSuccessfulUpdateRemoteMirrorRequestWithWildcardsFeatured(t *testing.T, ctx context.Context, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -757,7 +757,7 @@ func testSuccessfulUpdateRemoteMirrorRequestWithWildcardsFeatured(t *testing.T, } func testUpdateRemoteMirrorInmemory(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -830,7 +830,7 @@ func testSuccessfulUpdateRemoteMirrorRequestWithKeepDivergentRefs(t *testing.T, } func testSuccessfulUpdateRemoteMirrorRequestWithKeepDivergentRefsFeatured(t *testing.T, ctx context.Context, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), @@ -923,7 +923,7 @@ func testFailedUpdateRemoteMirrorRequestDueToValidation(t *testing.T, cfg config } func testFailedUpdateRemoteMirrorRequestDueToValidationFeatured(t *testing.T, ctx context.Context, cfg config.Cfg, rubySrv *rubyserver.Server) { - serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + serverSocketPath := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRemoteServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/apply_gitattributes_test.go b/internal/gitaly/service/repository/apply_gitattributes_test.go index a19ee3c2b29..a3c093a9ae0 100644 --- a/internal/gitaly/service/repository/apply_gitattributes_test.go +++ b/internal/gitaly/service/repository/apply_gitattributes_test.go @@ -90,7 +90,7 @@ func TestApplyGitattributesWithTransaction(t *testing.T) { cfg, repo, repoPath := testcfg.BuildWithRepo(t) transactionServer := &testTransactionServer{} - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/fetch_remote_test.go b/internal/gitaly/service/repository/fetch_remote_test.go index 8dc6751278d..34f3b84756f 100644 --- a/internal/gitaly/service/repository/fetch_remote_test.go +++ b/internal/gitaly/service/repository/fetch_remote_test.go @@ -221,7 +221,7 @@ func TestFetchRemote_transaction(t *testing.T) { sourceCfg, _, sourceRepoPath := testcfg.BuildWithRepo(t) txManager := &mockTxManager{} - addr := testserver.RunGitalyServer(t, sourceCfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, sourceCfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/fork_test.go b/internal/gitaly/service/repository/fork_test.go index 8a387c6615a..af3b440a47c 100644 --- a/internal/gitaly/service/repository/fork_test.go +++ b/internal/gitaly/service/repository/fork_test.go @@ -239,7 +239,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s // protected by the same TLS certificate. cfg.TLS.KeyPath = "" - testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory())) }) diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index a892283936a..a29096e3cf7 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -392,7 +392,7 @@ func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) { } func runServerWithBadFetchInternalRemote(t *testing.T, cfg config.Cfg) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/gitaly/service/repository/testhelper_test.go b/internal/gitaly/service/repository/testhelper_test.go index 62be9ecc4a9..4f85f87ce63 100644 --- a/internal/gitaly/service/repository/testhelper_test.go +++ b/internal/gitaly/service/repository/testhelper_test.go @@ -119,7 +119,7 @@ func assertModTimeAfter(t *testing.T, afterTime time.Time, paths ...string) bool } func runRepositoryServerWithConfig(t testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server, opts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, NewServer( cfg, deps.GetRubyServer(), diff --git a/internal/gitaly/service/server/info_test.go b/internal/gitaly/service/server/info_test.go index e253264cd54..fa2102d34c7 100644 --- a/internal/gitaly/service/server/info_test.go +++ b/internal/gitaly/service/server/info_test.go @@ -57,7 +57,7 @@ func TestGitalyServerInfo(t *testing.T) { } func runServer(t *testing.T, cfg config.Cfg, opts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterServerServiceServer(srv, NewServer(deps.GetGitCmdFactory(), deps.GetCfg().Storages)) }, opts...) } diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 31859d84306..e92aeac59dd 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -52,7 +52,7 @@ var ( ) // RegisterAll will register all the known gRPC services on the provided gRPC service instance. -func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { +func RegisterAll(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterBlobServiceServer(srv, blob.NewServer( deps.GetCfg(), deps.GetLocator(), @@ -143,6 +143,9 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { gitalypb.RegisterInternalGitalyServer(srv, internalgitaly.NewServer(deps.GetCfg().Storages)) healthpb.RegisterHealthServer(srv, health.NewServer()) - reflection.Register(srv) - grpcprometheus.Register(srv) + + if gs, ok := srv.(*grpc.Server); ok { + reflection.Register(gs) + grpcprometheus.Register(gs) + } } diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go index 1ca4e84e799..c83650b1af5 100644 --- a/internal/gitaly/service/smarthttp/receive_pack_test.go +++ b/internal/gitaly/service/smarthttp/receive_pack_test.go @@ -621,7 +621,7 @@ func TestPostReceiveWithReferenceTransactionHook(t *testing.T) { refTransactionServer := &testTransactionServer{} - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSmartHTTPServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go index b2639798539..2ee3d2ddc7f 100644 --- a/internal/gitaly/service/smarthttp/testhelper_test.go +++ b/internal/gitaly/service/smarthttp/testhelper_test.go @@ -36,7 +36,7 @@ func testMain(m *testing.M) int { } func startSmartHTTPServer(t *testing.T, cfg config.Cfg, serverOpts ...ServerOpt) testserver.GitalyServer { - return testserver.StartGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.StartGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSmartHTTPServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/ssh/testhelper_test.go b/internal/gitaly/service/ssh/testhelper_test.go index d99c6b2abba..2695c765036 100644 --- a/internal/gitaly/service/ssh/testhelper_test.go +++ b/internal/gitaly/service/ssh/testhelper_test.go @@ -31,7 +31,7 @@ func runSSHServer(t *testing.T, cfg config.Cfg, serverOpts ...testserver.GitalyS } func runSSHServerWithOptions(t *testing.T, cfg config.Cfg, opts []ServerOpt, serverOpts ...testserver.GitalyServerOpt) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSSHServiceServer(srv, NewServer( deps.GetCfg(), deps.GetLocator(), diff --git a/internal/gitaly/service/wiki/testhelper_test.go b/internal/gitaly/service/wiki/testhelper_test.go index 85d2d1cc40c..61ccfd3d1b8 100644 --- a/internal/gitaly/service/wiki/testhelper_test.go +++ b/internal/gitaly/service/wiki/testhelper_test.go @@ -85,7 +85,7 @@ func TestWithRubySidecar(t *testing.T) { } func setupWikiService(t testing.TB, cfg config.Cfg, rubySrv *rubyserver.Server) gitalypb.WikiServiceClient { - addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, rubySrv, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterWikiServiceServer(srv, NewServer(deps.GetRubyServer(), deps.GetLocator())) }) client := newWikiClient(t, addr) diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go index 028df3d20af..6878a070838 100644 --- a/internal/gitaly/transaction/manager_test.go +++ b/internal/gitaly/transaction/manager_test.go @@ -191,7 +191,7 @@ func TestPoolManager_Stop(t *testing.T) { func runTransactionServer(t *testing.T, cfg config.Cfg) (*testTransactionServer, string) { transactionServer := &testTransactionServer{} cfg.ListenAddr = ":0" // pushes gRPC to listen on the TCP address - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRefTransactionServer(srv, transactionServer) }, testserver.WithDisablePraefect()) return transactionServer, addr diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index d7dcdd369e2..914d32565a6 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1451,7 +1451,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { wg: &wg, } - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterOperationServiceServer(srv, operationServer) }, testserver.WithDiskCache(&mockDiskCache{})) diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index 4fd1a69fbbb..8e00abd6fd0 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -31,7 +31,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { testRepo = repo } cfgs = append(cfgs, cfg) - cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv *grpc.Server, deps *service.Dependencies) { + cfgs[i].SocketPath = testserver.RunGitalyServer(t, cfgs[i], nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer( deps.GetCfg(), deps.GetRubyServer(), diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b5675f8022d..a4fc45e8530 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -570,7 +570,7 @@ func (m *mockServer) PackRefs(ctx context.Context, in *gitalypb.PackRefsRequest) func runMockRepositoryServer(t *testing.T, cfg gconfig.Cfg) (*mockServer, string) { mockServer := newMockRepositoryServer() - addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + addr := testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterRepositoryServiceServer(srv, mockServer) gitalypb.RegisterRefServiceServer(srv, mockServer) }) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 71812100e74..22da58a6b8e 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -724,7 +724,7 @@ func (m *mockSmartHTTP) Called(method string) int { } func newSmartHTTPGrpcServer(t *testing.T, cfg gconfig.Cfg, smartHTTPService gitalypb.SmartHTTPServiceServer) string { - return testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { + return testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterSmartHTTPServiceServer(srv, smartHTTPService) }, testserver.WithDisablePraefect()) } diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index df54fa39c5d..5b878a7f3d8 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -41,7 +41,7 @@ import ( // It accepts addition Registrar to register all required service instead of // calling service.RegisterAll explicitly because it creates a circular dependency // when the function is used in on of internal/gitaly/service/... packages. -func RunGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) string { +func RunGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv grpc.ServiceRegistrar, deps *service.Dependencies), opts ...GitalyServerOpt) string { _, gitalyAddr, disablePraefect := runGitaly(t, cfg, rubyServer, registrar, opts...) praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN") @@ -135,7 +135,7 @@ func (gs GitalyServer) Address() string { } // StartGitalyServer creates and runs gitaly (and praefect as a proxy) server. -func StartGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) GitalyServer { +func StartGitalyServer(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv grpc.ServiceRegistrar, deps *service.Dependencies), opts ...GitalyServerOpt) GitalyServer { gitalySrv, gitalyAddr, disablePraefect := runGitaly(t, cfg, rubyServer, registrar, opts...) praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN") @@ -199,7 +199,7 @@ func IsHealthy(conn *grpc.ClientConn, timeout time.Duration) bool { return true } -func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv *grpc.Server, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) { +func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, registrar func(srv grpc.ServiceRegistrar, deps *service.Dependencies), opts ...GitalyServerOpt) (*grpc.Server, string, bool) { t.Helper() var gsd gitalyServerDeps -- GitLab From be8d56cdeacef95f306a17e7ea303b03d22ef632 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 27 Jul 2021 14:43:00 +0000 Subject: [PATCH 2/7] Combine listenmux and streamrpc into a publicly exposed TestStream RPC - Add a new RPC TestStream to gitaly/proto, which uses the StreamRPC protocol - Put all the pieces together so that Gitaly can accept StreamRPC calls and handle the TestStream RPC over TCP, TLS, Unix sockets - The first iteration should include metrics, logging and authentication Changelog: added --- cmd/gitaly-ssh/auth_test.go | 4 +- cmd/gitaly/main.go | 37 ++-- internal/gitaly/server/auth_test.go | 82 ++++++-- internal/gitaly/server/server.go | 88 ++++---- internal/gitaly/server/server_factory.go | 29 +-- internal/gitaly/server/server_factory_test.go | 198 ++++++++++++------ .../gitaly/service/repository/fork_test.go | 3 +- internal/gitaly/service/setup/register.go | 7 +- internal/gitaly/service/teststream/server.go | 37 ++++ .../gitaly/service/teststream/server_test.go | 115 ++++++++++ internal/streamrpc/handshaker.go | 137 ++++++++++++ internal/streamrpc/rpc_test.go | 34 ++- internal/streamrpc/server.go | 11 +- internal/testhelper/testserver/gitaly.go | 17 +- proto/go/gitalypb/protolist.go | 1 + proto/go/gitalypb/teststream.pb.go | 173 +++++++++++++++ proto/go/gitalypb/teststream_grpc.pb.go | 102 +++++++++ proto/teststream.proto | 22 ++ ruby/proto/gitaly.rb | 2 + ruby/proto/gitaly/teststream_pb.rb | 20 ++ ruby/proto/gitaly/teststream_services_pb.rb | 22 ++ 21 files changed, 959 insertions(+), 182 deletions(-) create mode 100644 internal/gitaly/service/teststream/server.go create mode 100644 internal/gitaly/service/teststream/server_test.go create mode 100644 internal/streamrpc/handshaker.go create mode 100644 proto/go/gitalypb/teststream.pb.go create mode 100644 proto/go/gitalypb/teststream_grpc.pb.go create mode 100644 proto/teststream.proto create mode 100644 ruby/proto/gitaly/teststream_pb.rb create mode 100644 ruby/proto/gitaly/teststream_services_pb.rb diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go index 777b0757d65..cce031c4ca4 100644 --- a/cmd/gitaly-ssh/auth_test.go +++ b/cmd/gitaly-ssh/auth_test.go @@ -23,6 +23,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -152,7 +153,8 @@ func runServer(t *testing.T, secure bool, cfg config.Cfg, connectionType string, hookManager := hook.NewManager(locator, txManager, gitlab.NewMockClient(), cfg) gitCmdFactory := git.NewExecCommandFactory(cfg) diskCache := cache.New(cfg, locator) - srv, err := server.New(secure, cfg, testhelper.DiscardTestEntry(t), registry, diskCache) + streamRPCServer := streamrpc.NewServer() + srv, err := server.New(secure, cfg, testhelper.DiscardTestEntry(t), registry, diskCache, streamRPCServer) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ Cfg: cfg, diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index dd424300376..be48a6c203b 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -29,6 +29,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" glog "gitlab.com/gitlab-org/gitaly/v14/internal/log" "gitlab.com/gitlab-org/gitaly/v14/internal/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/tempdir" "gitlab.com/gitlab-org/gitaly/v14/internal/version" "gitlab.com/gitlab-org/labkit/monitoring" @@ -196,6 +197,19 @@ func run(cfg config.Cfg) error { } defer rubySrv.Stop() + deps := &service.Dependencies{ + Cfg: cfg, + RubyServer: rubySrv, + GitalyHookManager: hookManager, + TransactionManager: transactionManager, + StorageLocator: locator, + ClientPool: conns, + GitCmdFactory: gitCmdFactory, + Linguist: ling, + CatfileCache: catfileCache, + DiskCache: diskCache, + } + for _, c := range []starter.Config{ {Name: starter.Unix, Addr: cfg.SocketPath, HandoverOnUpgrade: true}, {Name: starter.Unix, Addr: cfg.GitalyInternalSocketPath(), HandoverOnUpgrade: false}, @@ -206,32 +220,23 @@ func run(cfg config.Cfg) error { continue } - var srv *grpc.Server + var grpcSrv *grpc.Server + var srpcSrv *streamrpc.Server if c.HandoverOnUpgrade { - srv, err = gitalyServerFactory.CreateExternal(c.IsSecure()) + grpcSrv, srpcSrv, err = gitalyServerFactory.CreateExternal(c.IsSecure()) if err != nil { return fmt.Errorf("create external gRPC server: %w", err) } } else { - srv, err = gitalyServerFactory.CreateInternal() + grpcSrv, srpcSrv, err = gitalyServerFactory.CreateInternal() if err != nil { return fmt.Errorf("create internal gRPC server: %w", err) } } - setup.RegisterAll(srv, &service.Dependencies{ - Cfg: cfg, - RubyServer: rubySrv, - GitalyHookManager: hookManager, - TransactionManager: transactionManager, - StorageLocator: locator, - ClientPool: conns, - GitCmdFactory: gitCmdFactory, - Linguist: ling, - CatfileCache: catfileCache, - DiskCache: diskCache, - }) - b.RegisterStarter(starter.New(c, srv)) + setup.RegisterAll(grpcSrv, deps) + setup.RegisterAll(srpcSrv, deps) + b.RegisterStarter(starter.New(c, grpcSrv)) } if addr := cfg.PrometheusListenAddr; addr != "" { diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 382d8fb931f..0f642c1d5ac 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -25,8 +25,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/teststream" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -86,30 +88,54 @@ func TestTLSSanity(t *testing.T) { func TestAuthFailures(t *testing.T) { testCases := []struct { - desc string - opts []grpc.DialOption - code codes.Code + desc string + creds credentials.PerRPCCredentials + code codes.Code + message string }{ - {desc: "no auth", opts: nil, code: codes.Unauthenticated}, { - desc: "invalid auth", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(brokenAuth{})}, - code: codes.Unauthenticated, + desc: "no auth", + creds: nil, + code: codes.Unauthenticated, + message: "rpc error: code = Unauthenticated desc = authentication required", }, { - desc: "wrong secret", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("foobar"))}, - code: codes.PermissionDenied, + desc: "invalid auth", + creds: brokenAuth{}, + code: codes.Unauthenticated, + message: "rpc error: code = Unauthenticated desc = authentication required", + }, + { + desc: "wrong secret", + creds: gitalyauth.RPCCredentialsV2("foobar"), + code: codes.PermissionDenied, + message: "rpc error: code = PermissionDenied desc = permission denied", }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - serverSocketPath := runServer(t, config.Cfg{Auth: auth.Config{Token: "quxbaz"}}) - connOpts := append(tc.opts, grpc.WithInsecure()) + cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{ + Auth: auth.Config{Token: "quxbaz"}, + })) + serverSocketPath := runServer(t, cfg) + + // Make a healthcheck gRPC call + connOpts := []grpc.DialOption{grpc.WithInsecure()} + if tc.creds != nil { + connOpts = append(connOpts, grpc.WithPerRPCCredentials(tc.creds)) + } conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) t.Cleanup(func() { conn.Close() }) testhelper.RequireGrpcError(t, healthCheck(conn), tc.code) + + // // Make a streamRPC call + var callOpts []streamrpc.CallOption + if tc.creds != nil { + callOpts = append(callOpts, streamrpc.WithCredentials(tc.creds)) + } + _, _, err = checkStreamRPC(t, streamrpc.DialNet(serverSocketPath), repo, callOpts...) + require.EqualError(t, err, tc.message) }) } } @@ -119,40 +145,54 @@ func TestAuthSuccess(t *testing.T) { testCases := []struct { desc string - opts []grpc.DialOption + creds credentials.PerRPCCredentials required bool token string }{ {desc: "no auth, not required"}, { desc: "v2 correct auth, not required", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))}, + creds: gitalyauth.RPCCredentialsV2(token), token: token, }, { desc: "v2 incorrect auth, not required", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2("incorrect"))}, + creds: gitalyauth.RPCCredentialsV2("incorrect"), token: token, }, { desc: "v2 correct auth, required", - opts: []grpc.DialOption{grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(token))}, + creds: gitalyauth.RPCCredentialsV2(token), token: token, required: true, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{ + cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{ Auth: auth.Config{Token: tc.token, Transitioning: !tc.required}, })) serverSocketPath := runServer(t, cfg) - connOpts := append(tc.opts, grpc.WithInsecure()) + + // Make a healthcheck gRPC call + connOpts := []grpc.DialOption{grpc.WithInsecure()} + if tc.creds != nil { + connOpts = append(connOpts, grpc.WithPerRPCCredentials(tc.creds)) + } conn, err := dial(serverSocketPath, connOpts) require.NoError(t, err, tc.desc) t.Cleanup(func() { conn.Close() }) assert.NoError(t, healthCheck(conn), tc.desc) + + // // Make a streamRPC call + var callOpts []streamrpc.CallOption + if tc.creds != nil { + callOpts = append(callOpts, streamrpc.WithCredentials(tc.creds)) + } + in, out, err := checkStreamRPC(t, streamrpc.DialNet(serverSocketPath), repo, callOpts...) + require.NoError(t, err) + require.Equal(t, in, out) }) } } @@ -201,8 +241,10 @@ func runServer(t *testing.T, cfg config.Cfg) string { gitCmdFactory := git.NewExecCommandFactory(cfg) catfileCache := catfile.NewCache(cfg) diskCache := cache.New(cfg, locator) + streamRPCServer := streamrpc.NewServer() + gitalypb.RegisterTestStreamServiceServer(streamRPCServer, teststream.NewServer(locator)) - srv, err := New(false, cfg, testhelper.DiscardTestEntry(t), registry, diskCache) + srv, err := New(false, cfg, testhelper.DiscardTestEntry(t), registry, diskCache, streamRPCServer) require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ @@ -236,7 +278,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { conns := client.NewPool() t.Cleanup(func() { conns.Close() }) - srv, err := New(true, cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + srv, err := New(true, cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)), streamrpc.NewServer()) require.NoError(t, err) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index ffdf55c10dd..82b48f958e3 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -28,6 +28,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" "google.golang.org/grpc" @@ -74,6 +75,7 @@ func New( logrusEntry *log.Entry, registry *backchannel.Registry, cacheInvalidator diskcache.Invalidator, + streamRPCServer *streamrpc.Server, ) (*grpc.Server, error) { ctxTagOpts := []grpcmwtags.Option{ grpcmwtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), @@ -96,53 +98,63 @@ func New( }) } + serverStreamInterceptorChain := grpcmw.ChainStreamServer( + grpcmwtags.StreamServerInterceptor(ctxTagOpts...), + grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.StreamInterceptor, + grpcprometheus.StreamServerInterceptor, + commandstatshandler.StreamInterceptor, + grpcmwlogrus.StreamServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), + sentryhandler.StreamLogHandler, + cancelhandler.Stream, // Should be below LogHandler + auth.StreamServerInterceptor(cfg.Auth), + lh.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + grpctracing.StreamServerTracingInterceptor(), + cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.StreamPanicHandler, + ) + + serverUnaryInterceptorChain := grpcmw.ChainUnaryServer( + grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), + grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler + metadatahandler.UnaryInterceptor, + grpcprometheus.UnaryServerInterceptor, + commandstatshandler.UnaryInterceptor, + grpcmwlogrus.UnaryServerInterceptor(logrusEntry, + grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), + grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), + sentryhandler.UnaryLogHandler, + cancelhandler.Unary, // Should be below LogHandler + auth.UnaryServerInterceptor(cfg.Auth), + lh.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued + grpctracing.UnaryServerTracingInterceptor(), + cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), + // Panic handler should remain last so that application panics will be + // converted to errors and logged + panichandler.UnaryPanicHandler, + ) + + streamRPCServer.UseInterceptor(serverUnaryInterceptorChain) + lm := listenmux.New(transportCredentials) lm.Register(backchannel.NewServerHandshaker( logrusEntry, registry, []grpc.DialOption{client.UnaryInterceptor()}, )) + lm.Register(streamrpc.NewServerHandshaker( + streamRPCServer, + gitalylog.Default(), + )) opts := []grpc.ServerOption{ grpc.Creds(lm), - grpc.StreamInterceptor(grpcmw.ChainStreamServer( - grpcmwtags.StreamServerInterceptor(ctxTagOpts...), - grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.StreamInterceptor, - grpcprometheus.StreamServerInterceptor, - commandstatshandler.StreamInterceptor, - grpcmwlogrus.StreamServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), - sentryhandler.StreamLogHandler, - cancelhandler.Stream, // Should be below LogHandler - auth.StreamServerInterceptor(cfg.Auth), - lh.StreamInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.StreamServerTracingInterceptor(), - cache.StreamInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.StreamPanicHandler, - )), - grpc.UnaryInterceptor(grpcmw.ChainUnaryServer( - grpcmwtags.UnaryServerInterceptor(ctxTagOpts...), - grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler - metadatahandler.UnaryInterceptor, - grpcprometheus.UnaryServerInterceptor, - commandstatshandler.UnaryInterceptor, - grpcmwlogrus.UnaryServerInterceptor(logrusEntry, - grpcmwlogrus.WithTimestampFormat(gitalylog.LogTimestampFormat), - grpcmwlogrus.WithMessageProducer(commandstatshandler.CommandStatsMessageProducer)), - sentryhandler.UnaryLogHandler, - cancelhandler.Unary, // Should be below LogHandler - auth.UnaryServerInterceptor(cfg.Auth), - lh.UnaryInterceptor(), // Should be below auth handler to prevent v2 hmac tokens from timing out while queued - grpctracing.UnaryServerTracingInterceptor(), - cache.UnaryInvalidator(cacheInvalidator, protoregistry.GitalyProtoPreregistered), - // Panic handler should remain last so that application panics will be - // converted to errors and logged - panichandler.UnaryPanicHandler, - )), + grpc.StreamInterceptor(serverStreamInterceptorChain), + grpc.UnaryInterceptor(serverUnaryInterceptorChain), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 20 * time.Second, PermitWithoutStream: true, diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index 2b2d9e73dfd..bae8fad6374 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/maintenance" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -133,26 +134,30 @@ func (s *GitalyServerFactory) GracefulStop() { } } -// CreateExternal creates a new external gRPC server. The external servers are closed +// CreateExternal creates a new external gRPC server and StreamRPC server. The external servers are closed // before the internal servers when gracefully shutting down. -func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, error) { - server, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator) +func (s *GitalyServerFactory) CreateExternal(secure bool) (*grpc.Server, *streamrpc.Server, error) { + streamRPCServer := streamrpc.NewServer() + grpcServer, err := New(secure, s.cfg, s.logger, s.registry, s.cacheInvalidator, streamRPCServer) if err != nil { - return nil, err + return nil, nil, err } - s.externalServers = append(s.externalServers, server) - return server, nil + s.externalServers = append(s.externalServers, grpcServer) + + return grpcServer, streamRPCServer, nil } -// CreateInternal creates a new internal gRPC server. Internal servers are closed +// CreateInternal creates a new internal gRPC server and StreamRPC server. Internal servers are closed // after the external ones when gracefully shutting down. -func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, error) { - server, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator) +func (s *GitalyServerFactory) CreateInternal() (*grpc.Server, *streamrpc.Server, error) { + streamRPCServer := streamrpc.NewServer() + grpcServer, err := New(false, s.cfg, s.logger, s.registry, s.cacheInvalidator, streamRPCServer) if err != nil { - return nil, err + return nil, nil, err } - s.internalServers = append(s.internalServers, server) - return server, nil + s.internalServers = append(s.internalServers, grpcServer) + + return grpcServer, streamRPCServer, nil } diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index c851c32d7c5..34c9c46c376 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -1,10 +1,14 @@ package server import ( + "bytes" "context" "crypto/tls" "crypto/x509" "errors" + "io" + "io/ioutil" + "math/rand" "net" "os" "testing" @@ -16,8 +20,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/v14/internal/cache" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/teststream" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -30,71 +37,24 @@ func TestGitalyServerFactory(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - checkHealth := func(t *testing.T, sf *GitalyServerFactory, schema, addr string) healthpb.HealthClient { - t.Helper() - - var cc *grpc.ClientConn - if schema == starter.TLS { - listener, err := net.Listen(starter.TCP, addr) - require.NoError(t, err) - t.Cleanup(func() { listener.Close() }) - - srv, err := sf.CreateExternal(true) - require.NoError(t, err) - healthpb.RegisterHealthServer(srv, health.NewServer()) - go srv.Serve(listener) - - certPool, err := x509.SystemCertPool() - require.NoError(t, err) - - pem := testhelper.MustReadFile(t, sf.cfg.TLS.CertPath) - require.True(t, certPool.AppendCertsFromPEM(pem)) - - creds := credentials.NewTLS(&tls.Config{ - RootCAs: certPool, - MinVersion: tls.VersionTLS12, - }) - - cc, err = grpc.DialContext(ctx, listener.Addr().String(), grpc.WithTransportCredentials(creds)) - require.NoError(t, err) - } else { - listener, err := net.Listen(schema, addr) - require.NoError(t, err) - t.Cleanup(func() { listener.Close() }) - - srv, err := sf.CreateExternal(false) - require.NoError(t, err) - healthpb.RegisterHealthServer(srv, health.NewServer()) - go srv.Serve(listener) - - endpoint, err := starter.ComposeEndpoint(schema, listener.Addr().String()) - require.NoError(t, err) - - cc, err = client.Dial(endpoint, nil) - require.NoError(t, err) - } - - t.Cleanup(func() { cc.Close() }) - - healthClient := healthpb.NewHealthClient(cc) + t.Run("insecure over TCP", func(t *testing.T) { + cfg, repo, _ := testcfg.BuildWithRepo(t) + sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) - resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{}) - require.NoError(t, err) - require.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status) - return healthClient - } + check(t, ctx, sf, cfg, repo, starter.TCP, "localhost:0") + }) - t.Run("insecure", func(t *testing.T) { - cfg := testcfg.Build(t) + t.Run("insecure over Unix Socket", func(t *testing.T) { + cfg, repo, _ := testcfg.BuildWithRepo(t) sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) - checkHealth(t, sf, starter.TCP, "localhost:0") + check(t, ctx, sf, cfg, repo, starter.Unix, testhelper.GetTemporaryGitalySocketFileName(t)) }) t.Run("secure", func(t *testing.T) { certFile, keyFile := testhelper.GenerateCerts(t) - cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{TLS: config.TLS{ + cfg, repo, _ := testcfg.BuildWithRepo(t, testcfg.WithBase(config.Cfg{TLS: config.TLS{ CertPath: certFile, KeyPath: keyFile, }})) @@ -102,20 +62,20 @@ func TestGitalyServerFactory(t *testing.T) { sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) t.Cleanup(sf.Stop) - checkHealth(t, sf, starter.TLS, "localhost:0") + check(t, ctx, sf, cfg, repo, starter.TLS, "localhost:0") }) t.Run("all services must be stopped", func(t *testing.T) { - cfg := testcfg.Build(t) + cfg, repo, _ := testcfg.BuildWithRepo(t) sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) t.Cleanup(sf.Stop) - tcpHealthClient := checkHealth(t, sf, starter.TCP, "localhost:0") + tcpHealthClient := check(t, ctx, sf, cfg, repo, starter.TCP, "localhost:0") socket := testhelper.GetTemporaryGitalySocketFileName(t) t.Cleanup(func() { require.NoError(t, os.RemoveAll(socket)) }) - socketHealthClient := checkHealth(t, sf, starter.Unix, socket) + socketHealthClient := check(t, ctx, sf, cfg, repo, starter.Unix, socket) sf.GracefulStop() // stops all started servers(listeners) @@ -185,7 +145,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { }{ { createServer: func() *grpc.Server { - server, err := sf.CreateInternal() + server, _, err := sf.CreateInternal() require.NoError(t, err) return server }, @@ -195,7 +155,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { }, { createServer: func() *grpc.Server { - server, err := sf.CreateExternal(false) + server, _, err := sf.CreateExternal(false) require.NoError(t, err) return server }, @@ -287,3 +247,117 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { // wait until the graceful shutdown completes <-shutdownCompeleted } + +func check(t *testing.T, ctx context.Context, sf *GitalyServerFactory, cfg config.Cfg, repo *gitalypb.Repository, schema, addr string) healthpb.HealthClient { + t.Helper() + + var grpcConn *grpc.ClientConn + var streamRPCDial streamrpc.DialFunc + + if schema == starter.TLS { + listener, err := net.Listen(starter.TCP, addr) + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) + + grpcSrv, srpcSrv, err := sf.CreateExternal(true) + require.NoError(t, err) + healthpb.RegisterHealthServer(grpcSrv, health.NewServer()) + registerStreamRPCServers(t, srpcSrv, cfg) + go grpcSrv.Serve(listener) + + certPool, err := x509.SystemCertPool() + require.NoError(t, err) + + pem := testhelper.MustReadFile(t, sf.cfg.TLS.CertPath) + require.True(t, certPool.AppendCertsFromPEM(pem)) + + tlsConf := &tls.Config{ + RootCAs: certPool, + MinVersion: tls.VersionTLS12, + } + creds := credentials.NewTLS(tlsConf) + + streamRPCDial = streamrpc.DialTLS(listener.Addr().String(), tlsConf) + grpcConn, err = grpc.DialContext(ctx, listener.Addr().String(), grpc.WithTransportCredentials(creds)) + require.NoError(t, err) + } else { + listener, err := net.Listen(schema, addr) + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) + + grpcSrv, srpcSrv, err := sf.CreateExternal(false) + require.NoError(t, err) + healthpb.RegisterHealthServer(grpcSrv, health.NewServer()) + registerStreamRPCServers(t, srpcSrv, cfg) + go grpcSrv.Serve(listener) + + endpoint, err := starter.ComposeEndpoint(schema, listener.Addr().String()) + require.NoError(t, err) + + streamRPCDial = streamrpc.DialNet(endpoint) + grpcConn, err = client.Dial(endpoint, nil) + require.NoError(t, err) + } + + // Make a healthcheck gRPC call + t.Cleanup(func() { grpcConn.Close() }) + healthClient := healthpb.NewHealthClient(grpcConn) + + resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{}) + require.NoError(t, err) + require.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status) + + // Make a streamRPC call + in, out, err := checkStreamRPC(t, streamRPCDial, repo) + require.NoError(t, err) + require.Equal(t, in, out, "byte stream works") + + return healthClient +} + +func registerStreamRPCServers(t *testing.T, srv *streamrpc.Server, cfg config.Cfg) { + gitalypb.RegisterTestStreamServiceServer(srv, teststream.NewServer(config.NewLocator(cfg))) +} + +func checkStreamRPC(t *testing.T, dial streamrpc.DialFunc, repo *gitalypb.Repository, opts ...streamrpc.CallOption) ([]byte, []byte, error) { + ctx, cancel := testhelper.Context() + defer cancel() + + const size = 1024 * 1024 + + in := make([]byte, size) + _, err := rand.Read(in) + require.NoError(t, err) + + var out []byte + require.NotEqual(t, in, out) + + err = streamrpc.Call( + ctx, + dial, + "/gitaly.TestStreamService/TestStream", + &gitalypb.TestStreamRequest{ + Repository: repo, + Size: size, + }, + func(c net.Conn) error { + errC := make(chan error, 1) + go func() { + var err error + out, err = ioutil.ReadAll(c) + errC <- err + }() + + if _, err := io.Copy(c, bytes.NewReader(in)); err != nil { + return err + } + if err := <-errC; err != nil { + return err + } + + return nil + }, + opts..., + ) + return in, out, err +} diff --git a/internal/gitaly/service/repository/fork_test.go b/internal/gitaly/service/repository/fork_test.go index af3b440a47c..76542080a37 100644 --- a/internal/gitaly/service/repository/fork_test.go +++ b/internal/gitaly/service/repository/fork_test.go @@ -28,6 +28,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ssh" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/gitlab" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -217,7 +218,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg, rubySrv *rubyserver.Server) s registry := backchannel.NewRegistry() locator := config.NewLocator(cfg) cache := cache.New(cfg, locator) - server, err := gserver.New(true, cfg, testhelper.DiscardTestEntry(t), registry, cache) + server, err := gserver.New(true, cfg, testhelper.DiscardTestEntry(t), registry, cache, streamrpc.NewServer()) require.NoError(t, err) listener, addr := testhelper.GetLocalhostListener(t) diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index e92aeac59dd..827906f3a0a 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -21,6 +21,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/server" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/smarthttp" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ssh" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/teststream" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/wiki" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" @@ -51,7 +52,7 @@ var ( ) ) -// RegisterAll will register all the known gRPC services on the provided gRPC service instance. +// RegisterAll will register all the known gRPC + StreamRPC services func RegisterAll(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterBlobServiceServer(srv, blob.NewServer( deps.GetCfg(), @@ -144,6 +145,10 @@ func RegisterAll(srv grpc.ServiceRegistrar, deps *service.Dependencies) { healthpb.RegisterHealthServer(srv, health.NewServer()) + gitalypb.RegisterTestStreamServiceServer(srv, teststream.NewServer( + deps.GetLocator(), + )) + if gs, ok := srv.(*grpc.Server); ok { reflection.Register(gs) grpcprometheus.Register(gs) diff --git a/internal/gitaly/service/teststream/server.go b/internal/gitaly/service/teststream/server.go new file mode 100644 index 00000000000..51256b22679 --- /dev/null +++ b/internal/gitaly/service/teststream/server.go @@ -0,0 +1,37 @@ +package teststream + +import ( + "context" + "io" + + "gitlab.com/gitlab-org/gitaly/v14/internal/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +type server struct { + gitalypb.UnimplementedTestStreamServiceServer + locator storage.Locator +} + +func (s *server) TestStream(ctx context.Context, request *gitalypb.TestStreamRequest) (*emptypb.Empty, error) { + if _, err := s.locator.GetRepoPath(request.Repository); err != nil { + return nil, err + } + + c, err := streamrpc.AcceptConnection(ctx) + if err != nil { + return nil, err + } + + _, err = io.CopyN(c, c, request.Size) + return nil, err +} + +// NewServer creates a new instance of a grpc ServerServiceServer +func NewServer(locator storage.Locator) gitalypb.TestStreamServiceServer { + return &server{ + locator: locator, + } +} diff --git a/internal/gitaly/service/teststream/server_test.go b/internal/gitaly/service/teststream/server_test.go new file mode 100644 index 00000000000..8051cf78758 --- /dev/null +++ b/internal/gitaly/service/teststream/server_test.go @@ -0,0 +1,115 @@ +package teststream + +import ( + "bytes" + "io" + "io/ioutil" + "math/rand" + "net" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" +) + +func TestTestStreamPingPong(t *testing.T) { + const size = 1024 * 1024 + + addr, repo := runGitalyServer(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + in := make([]byte, size) + _, err := rand.Read(in) + require.NoError(t, err) + + var out []byte + require.NotEqual(t, in, out) + require.NoError(t, streamrpc.Call( + ctx, + streamrpc.DialNet(addr), + "/gitaly.TestStreamService/TestStream", + &gitalypb.TestStreamRequest{ + Repository: repo, + Size: size, + }, + func(c net.Conn) error { + errC := make(chan error, 1) + go func() { + var err error + out, err = ioutil.ReadAll(c) + errC <- err + }() + + if _, err := io.Copy(c, bytes.NewReader(in)); err != nil { + return err + } + if err := <-errC; err != nil { + return err + } + + return nil + }, + )) + + require.Equal(t, in, out, "byte stream works") +} + +func TestTestStreamPingPongWithInvalidRepo(t *testing.T) { + addr, repo := runGitalyServer(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + err := streamrpc.Call( + ctx, + streamrpc.DialNet(addr), + "/gitaly.TestStreamService/TestStream", + &gitalypb.TestStreamRequest{ + Repository: &gitalypb.Repository{ + StorageName: repo.StorageName, + RelativePath: "@hashed/94/00/notexist.git", + GlRepository: repo.GlRepository, + GlProjectPath: repo.GlProjectPath, + }, + Size: 1024 * 1024, + }, + func(c net.Conn) error { + panic("Should not reach here") + }, + ) + + require.Error(t, err) + require.Contains( + t, err.Error(), + "rpc error: code = NotFound desc = GetRepoPath: not a git repository", + ) +} + +func runGitalyServer(t *testing.T) (string, *gitalypb.Repository) { + t.Helper() + testhelper.Configure() + + cfg, repo, _ := testcfg.BuildWithRepo(t) + + addr := testserver.RunGitalyServer( + t, cfg, nil, + func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { + gitalypb.RegisterTestStreamServiceServer(srv, NewServer(deps.GetLocator())) + }, + // TODO: At the moment, stream RPC doesn't work well with Praefect, + // hence we have to disable Praefect. We can remove this option after + // https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1127 is + // done + testserver.WithDisablePraefect(), + ) + + return addr, repo +} diff --git a/internal/streamrpc/handshaker.go b/internal/streamrpc/handshaker.go new file mode 100644 index 00000000000..54548af86d8 --- /dev/null +++ b/internal/streamrpc/handshaker.go @@ -0,0 +1,137 @@ +package streamrpc + +import ( + "crypto/tls" + "fmt" + "net" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter" + "google.golang.org/grpc/credentials" +) + +// The magic bytes used for classification by listenmux +var magicBytes = []byte("streamrpc00") + +// DialNet lets Call initiate unencrypted connections. They tend to be used +// with Gitaly's listenmux multiplexer only. After the connection is +// established, streamrpc's 11-byte magic bytes are written into the wire. +// Listemmux peeks into these magic bytes and redirects the request to +// StreamRPC server. +// Please visit internal/listenmux/mux.go for more information +func DialNet(address string) DialFunc { + return func(t time.Duration) (net.Conn, error) { + endpoint, err := starter.ParseEndpoint(address) + if err != nil { + return nil, err + } + + // Dial-only deadline + deadline := time.Now().Add(t) + + dialer := &net.Dialer{Deadline: deadline} + conn, err := dialer.Dial(endpoint.Name, endpoint.Addr) + if err != nil { + return nil, err + } + + if err = conn.SetDeadline(deadline); err != nil { + return nil, err + } + // Write the magic bytes on the connection so the server knows we're + // about to initiate a multiplexing session. + if _, err := conn.Write(magicBytes); err != nil { + return nil, fmt.Errorf("streamrpc client: write backchannel magic bytes: %w", err) + } + + // Reset deadline of tls connection for later stages + if err = conn.SetDeadline(time.Time{}); err != nil { + return nil, err + } + + return conn, nil + } +} + +// DialTLS lets Call initiate TLS connections. Similar to DialNet, the +// connections are used for listenmux multiplexer. There are 3 steps involving: +// - TCP handshake +// - TLS handshake +// - Write streamrpc magic bytes +func DialTLS(address string, cfg *tls.Config) DialFunc { + return func(t time.Duration) (net.Conn, error) { + // Dial-only deadline + deadline := time.Now().Add(t) + + dialer := &net.Dialer{Deadline: deadline} + tlsConn, err := tls.DialWithDialer(dialer, "tcp", address, cfg) + if err != nil { + return nil, err + } + + err = tlsConn.SetDeadline(deadline) + if err != nil { + return nil, err + } + // Write the magic bytes on the connection so the server knows we're + // about to initiate a multiplexing session. + if _, err := tlsConn.Write(magicBytes); err != nil { + return nil, fmt.Errorf("streamrpc client: write backchannel magic bytes: %w", err) + } + + // Reset deadline of tls connection for later stages + if err = tlsConn.SetDeadline(time.Time{}); err != nil { + return nil, err + } + + return tlsConn, nil + } +} + +// ServerHandshaker implements the server side handshake of the multiplexed connection. +type ServerHandshaker struct { + server *Server + logger logrus.FieldLogger +} + +// NewServerHandshaker returns an implementation of streamrpc server +// handshaker. The provided TransportCredentials are handshaked prior to +// initializing the multiplexing session. This handshaker Gitaly's unary server +// interceptors into the interceptor chain of input StreamRPC server. +func NewServerHandshaker(server *Server, logger logrus.FieldLogger) *ServerHandshaker { + return &ServerHandshaker{ + server: server, + logger: logger, + } +} + +// Magic is used by listenmux to retrieve the magic string for +// streamrpc connections. +func (s *ServerHandshaker) Magic() string { return string(magicBytes) } + +// Handshake "steals" the request from Gitaly's main gRPC server during +// connection handshaking phase. Listenmux depends on the first 11-byte magic +// bytes sent by the client, and invoke StreamRPC handshaker accordingly. The +// request is then handled by stream RPC server, and skipped by Gitaly gRPC +// server. +func (s *ServerHandshaker) Handshake(conn net.Conn, authInfo credentials.AuthInfo) (net.Conn, credentials.AuthInfo, error) { + if err := conn.SetDeadline(time.Time{}); err != nil { + return nil, nil, err + } + + go func() { + if err := s.server.Handle(conn); err != nil { + s.logger.WithError(err).Error("streamrpc: handle call") + } + }() + // At this point, the connection is already closed. If the + // TransportCredentials continues its code path, gRPC constructs a HTTP2 + // server transport to handle the connection. Eventually, it fails and logs + // several warnings and errors even though the stream RPC call is + // successful. + // Fortunately, gRPC has credentials.ErrConnDispatched, indicating that the + // connection is already dispatched out of gRPC. gRPC should leave it alone + // and exit in peace. + return nil, nil, credentials.ErrConnDispatched +} diff --git a/internal/streamrpc/rpc_test.go b/internal/streamrpc/rpc_test.go index 6692227808e..3fe5ef529cd 100644 --- a/internal/streamrpc/rpc_test.go +++ b/internal/streamrpc/rpc_test.go @@ -188,18 +188,20 @@ func TestCall_serverMiddleware(t *testing.T) { ) interceptorDone := make(chan struct{}) + server := NewServer() + server.UseInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + defer close(interceptorDone) + middlewareMethod = info.FullMethod + receivedField = req.(*testpb.StreamRequest).StringField + if md, ok := metadata.FromIncomingContext(ctx); ok { + receivedValues = md[testKey] + } + return handler(ctx, req) + }) dial := startServer( t, - NewServer(WithServerInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - defer close(interceptorDone) - middlewareMethod = info.FullMethod - receivedField = req.(*testpb.StreamRequest).StringField - if md, ok := metadata.FromIncomingContext(ctx); ok { - receivedValues = md[testKey] - } - return handler(ctx, req) - })), + server, func(ctx context.Context, in *testpb.StreamRequest) (*emptypb.Empty, error) { _, err := AcceptConnection(ctx) return nil, err @@ -222,15 +224,11 @@ func TestCall_serverMiddleware(t *testing.T) { } func TestCall_serverMiddlewareReject(t *testing.T) { - dial := startServer( - t, - NewServer(WithServerInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - return nil, errors.New("middleware says no") - })), - func(ctx context.Context, in *testpb.StreamRequest) (*emptypb.Empty, error) { - panic("never reached") - }, - ) + server := NewServer() + server.UseInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return nil, errors.New("middleware says no") + }) + dial := startServer(t, server, func(ctx context.Context, in *testpb.StreamRequest) (*emptypb.Empty, error) { panic("never reached") }) err := Call( context.Background(), diff --git a/internal/streamrpc/server.go b/internal/streamrpc/server.go index 502764fd47f..2c4771af5fb 100644 --- a/internal/streamrpc/server.go +++ b/internal/streamrpc/server.go @@ -30,11 +30,6 @@ type method struct { // options to NewServer. type ServerOption func(*Server) -// WithServerInterceptor adds a unary gRPC server interceptor. -func WithServerInterceptor(interceptor grpc.UnaryServerInterceptor) ServerOption { - return func(s *Server) { s.interceptor = interceptor } -} - // NewServer returns a new StreamRPC server. You can pass the result to // grpc-go RegisterFooServer functions. func NewServer(opts ...ServerOption) *Server { @@ -60,6 +55,12 @@ func (s *Server) RegisterService(sd *grpc.ServiceDesc, impl interface{}) { } } +// UseInterceptor adds a unary gRPC server interceptor for the StreamRPC +// server to use. +func (s *Server) UseInterceptor(interceptor grpc.UnaryServerInterceptor) { + s.interceptor = interceptor +} + // Handle handles an incoming network connection with the StreamRPC // protocol. It is intended to be called from a net.Listener.Accept loop // (or something equivalent). diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 5b878a7f3d8..399aed5371a 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -210,20 +210,21 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi deps := gsd.createDependencies(t, cfg, rubyServer) t.Cleanup(func() { gsd.conns.Close() }) - srv, err := server.NewGitalyServerFactory( + grpcSrv, srpcSrv, err := server.NewGitalyServerFactory( cfg, gsd.logger.WithField("test", t.Name()), deps.GetBackchannelRegistry(), deps.GetDiskCache(), ).CreateExternal(cfg.TLS.CertPath != "" && cfg.TLS.KeyPath != "") require.NoError(t, err) - t.Cleanup(srv.Stop) + t.Cleanup(grpcSrv.Stop) - registrar(srv, deps) - if _, found := srv.GetServiceInfo()["grpc.health.v1.Health"]; !found { + registrar(grpcSrv, deps) + registrar(srpcSrv, deps) + if _, found := grpcSrv.GetServiceInfo()["grpc.health.v1.Health"]; !found { // we should register health service as it is used for the health checks // praefect service executes periodically (and on the bootstrap step) - healthpb.RegisterHealthServer(srv, health.NewServer()) + healthpb.RegisterHealthServer(grpcSrv, health.NewServer()) } // listen on internal socket @@ -243,7 +244,7 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi internalListener, err := net.Listen("unix", cfg.GitalyInternalSocketPath()) require.NoError(t, err) - go srv.Serve(internalListener) + go grpcSrv.Serve(internalListener) } var listener net.Listener @@ -266,9 +267,9 @@ func runGitaly(t testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, regi addr = "unix://" + serverSocketPath } - go srv.Serve(listener) + go grpcSrv.Serve(listener) - return srv, addr, gsd.disablePraefect + return grpcSrv, addr, gsd.disablePraefect } type gitalyServerDeps struct { diff --git a/proto/go/gitalypb/protolist.go b/proto/go/gitalypb/protolist.go index a15916f7044..9d26e24a784 100644 --- a/proto/go/gitalypb/protolist.go +++ b/proto/go/gitalypb/protolist.go @@ -23,6 +23,7 @@ var GitalyProtos = []string{ "shared.proto", "smarthttp.proto", "ssh.proto", + "teststream.proto", "transaction.proto", "wiki.proto", } diff --git a/proto/go/gitalypb/teststream.pb.go b/proto/go/gitalypb/teststream.pb.go new file mode 100644 index 00000000000..d6a68abce3e --- /dev/null +++ b/proto/go/gitalypb/teststream.pb.go @@ -0,0 +1,173 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.17.3 +// source: teststream.proto + +package gitalypb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type TestStreamRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"` + Size int64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"` +} + +func (x *TestStreamRequest) Reset() { + *x = TestStreamRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_teststream_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestStreamRequest) ProtoMessage() {} + +func (x *TestStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_teststream_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestStreamRequest.ProtoReflect.Descriptor instead. +func (*TestStreamRequest) Descriptor() ([]byte, []int) { + return file_teststream_proto_rawDescGZIP(), []int{0} +} + +func (x *TestStreamRequest) GetRepository() *Repository { + if x != nil { + return x.Repository + } + return nil +} + +func (x *TestStreamRequest) GetSize() int64 { + if x != nil { + return x.Size + } + return 0 +} + +var File_teststream_proto protoreflect.FileDescriptor + +var file_teststream_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x74, 0x65, 0x73, 0x74, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x1a, 0x0a, 0x6c, 0x69, 0x6e, 0x74, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0x61, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, + 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, + 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, + 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, + 0x73, 0x69, 0x7a, 0x65, 0x32, 0x5c, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x47, 0x0a, 0x0a, 0x54, 0x65, 0x73, + 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, + 0x08, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_teststream_proto_rawDescOnce sync.Once + file_teststream_proto_rawDescData = file_teststream_proto_rawDesc +) + +func file_teststream_proto_rawDescGZIP() []byte { + file_teststream_proto_rawDescOnce.Do(func() { + file_teststream_proto_rawDescData = protoimpl.X.CompressGZIP(file_teststream_proto_rawDescData) + }) + return file_teststream_proto_rawDescData +} + +var file_teststream_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_teststream_proto_goTypes = []interface{}{ + (*TestStreamRequest)(nil), // 0: gitaly.TestStreamRequest + (*Repository)(nil), // 1: gitaly.Repository + (*emptypb.Empty)(nil), // 2: google.protobuf.Empty +} +var file_teststream_proto_depIdxs = []int32{ + 1, // 0: gitaly.TestStreamRequest.repository:type_name -> gitaly.Repository + 0, // 1: gitaly.TestStreamService.TestStream:input_type -> gitaly.TestStreamRequest + 2, // 2: gitaly.TestStreamService.TestStream:output_type -> google.protobuf.Empty + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_teststream_proto_init() } +func file_teststream_proto_init() { + if File_teststream_proto != nil { + return + } + file_lint_proto_init() + file_shared_proto_init() + if !protoimpl.UnsafeEnabled { + file_teststream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestStreamRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_teststream_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_teststream_proto_goTypes, + DependencyIndexes: file_teststream_proto_depIdxs, + MessageInfos: file_teststream_proto_msgTypes, + }.Build() + File_teststream_proto = out.File + file_teststream_proto_rawDesc = nil + file_teststream_proto_goTypes = nil + file_teststream_proto_depIdxs = nil +} diff --git a/proto/go/gitalypb/teststream_grpc.pb.go b/proto/go/gitalypb/teststream_grpc.pb.go new file mode 100644 index 00000000000..e8b4c5fcb68 --- /dev/null +++ b/proto/go/gitalypb/teststream_grpc.pb.go @@ -0,0 +1,102 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package gitalypb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// TestStreamServiceClient is the client API for TestStreamService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TestStreamServiceClient interface { + TestStream(ctx context.Context, in *TestStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) +} + +type testStreamServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewTestStreamServiceClient(cc grpc.ClientConnInterface) TestStreamServiceClient { + return &testStreamServiceClient{cc} +} + +func (c *testStreamServiceClient) TestStream(ctx context.Context, in *TestStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/gitaly.TestStreamService/TestStream", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TestStreamServiceServer is the server API for TestStreamService service. +// All implementations must embed UnimplementedTestStreamServiceServer +// for forward compatibility +type TestStreamServiceServer interface { + TestStream(context.Context, *TestStreamRequest) (*emptypb.Empty, error) + mustEmbedUnimplementedTestStreamServiceServer() +} + +// UnimplementedTestStreamServiceServer must be embedded to have forward compatible implementations. +type UnimplementedTestStreamServiceServer struct { +} + +func (UnimplementedTestStreamServiceServer) TestStream(context.Context, *TestStreamRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TestStream not implemented") +} +func (UnimplementedTestStreamServiceServer) mustEmbedUnimplementedTestStreamServiceServer() {} + +// UnsafeTestStreamServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TestStreamServiceServer will +// result in compilation errors. +type UnsafeTestStreamServiceServer interface { + mustEmbedUnimplementedTestStreamServiceServer() +} + +func RegisterTestStreamServiceServer(s grpc.ServiceRegistrar, srv TestStreamServiceServer) { + s.RegisterService(&TestStreamService_ServiceDesc, srv) +} + +func _TestStreamService_TestStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TestStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TestStreamServiceServer).TestStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gitaly.TestStreamService/TestStream", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TestStreamServiceServer).TestStream(ctx, req.(*TestStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// TestStreamService_ServiceDesc is the grpc.ServiceDesc for TestStreamService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TestStreamService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "gitaly.TestStreamService", + HandlerType: (*TestStreamServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "TestStream", + Handler: _TestStreamService_TestStream_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "teststream.proto", +} diff --git a/proto/teststream.proto b/proto/teststream.proto new file mode 100644 index 00000000000..734047887f7 --- /dev/null +++ b/proto/teststream.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package gitaly; + +option go_package = "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"; + +import "lint.proto"; +import "shared.proto"; +import "google/protobuf/empty.proto"; + +service TestStreamService { + rpc TestStream(TestStreamRequest) returns (google.protobuf.Empty) { + option (op_type) = { + op: ACCESSOR + }; + } +} + +message TestStreamRequest { + Repository repository = 1 [(target_repository)=true]; + int64 size = 2; +} diff --git a/ruby/proto/gitaly.rb b/ruby/proto/gitaly.rb index 9c80cea63b4..9cfff81cf0f 100644 --- a/ruby/proto/gitaly.rb +++ b/ruby/proto/gitaly.rb @@ -37,6 +37,8 @@ require 'gitaly/smarthttp_services_pb' require 'gitaly/ssh_services_pb' +require 'gitaly/teststream_services_pb' + require 'gitaly/transaction_services_pb' require 'gitaly/wiki_services_pb' diff --git a/ruby/proto/gitaly/teststream_pb.rb b/ruby/proto/gitaly/teststream_pb.rb new file mode 100644 index 00000000000..d75050f048f --- /dev/null +++ b/ruby/proto/gitaly/teststream_pb.rb @@ -0,0 +1,20 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: teststream.proto + +require 'google/protobuf' + +require 'lint_pb' +require 'shared_pb' +require 'google/protobuf/empty_pb' +Google::Protobuf::DescriptorPool.generated_pool.build do + add_file("teststream.proto", :syntax => :proto3) do + add_message "gitaly.TestStreamRequest" do + optional :repository, :message, 1, "gitaly.Repository" + optional :size, :int64, 2 + end + end +end + +module Gitaly + TestStreamRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.TestStreamRequest").msgclass +end diff --git a/ruby/proto/gitaly/teststream_services_pb.rb b/ruby/proto/gitaly/teststream_services_pb.rb new file mode 100644 index 00000000000..d617668f5e6 --- /dev/null +++ b/ruby/proto/gitaly/teststream_services_pb.rb @@ -0,0 +1,22 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: teststream.proto for package 'gitaly' + +require 'grpc' +require 'teststream_pb' + +module Gitaly + module TestStreamService + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'gitaly.TestStreamService' + + rpc :TestStream, Gitaly::TestStreamRequest, Google::Protobuf::Empty + end + + Stub = Service.rpc_stub_class + end +end -- GitLab From f67b28bce05b926717638ab9575eefa9d3ed0ec6 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Tue, 29 Jun 2021 16:24:36 +0000 Subject: [PATCH 3/7] Require trailing flush in pktline.EachSidebandPacket This changes the behavior of pktline.EachSidebandPacket to expect a trailing flush packet. The old behavior was to expect the stream to end on a packet boundary. This is part of https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/463, where we need to be able to signal "end of stream" in a natural way and we cannot use EOF. There is only one call site for EachSidebandPacket, and that call site (PackObjectsHook) only consumes byte streams produced by the same Gitaly process. In other words, this change is safe in spite of being incompatible with the old behavior, because it can never be exposed to the old behavior. --- internal/git/pktline/pktline.go | 16 ++++++++++++---- .../{pkt_line_test.go => pktline_test.go} | 14 +++++++++++++- internal/gitaly/service/hook/pack_objects.go | 4 ++++ 3 files changed, 29 insertions(+), 5 deletions(-) rename internal/git/pktline/{pkt_line_test.go => pktline_test.go} (94%) diff --git a/internal/git/pktline/pktline.go b/internal/git/pktline/pktline.go index 665d6cbde64..6ccb9d46475 100644 --- a/internal/git/pktline/pktline.go +++ b/internal/git/pktline/pktline.go @@ -161,13 +161,17 @@ type errNotSideband struct{ pkt string } func (err *errNotSideband) Error() string { return fmt.Sprintf("invalid sideband packet: %q", err.pkt) } -// EachSidebandPacket iterates over a side-band-64k pktline stream. For -// each packet, it will call fn with the band ID and the packet. Fn must -// not retain the packet. +// EachSidebandPacket iterates over a side-band-64k pktline stream until +// it reaches a flush packet. For each packet, it will call fn with the +// band ID and the packet. Fn must not retain the packet. func EachSidebandPacket(r io.Reader, fn func(byte, []byte) error) error { scanner := NewScanner(r) for scanner.Scan() { + if IsFlush(scanner.Bytes()) { + return nil + } + data := Data(scanner.Bytes()) if len(data) == 0 { return &errNotSideband{scanner.Text()} @@ -177,5 +181,9 @@ func EachSidebandPacket(r io.Reader, fn func(byte, []byte) error) error { } } - return scanner.Err() + if err := scanner.Err(); err != nil { + return err + } + + return io.ErrUnexpectedEOF } diff --git a/internal/git/pktline/pkt_line_test.go b/internal/git/pktline/pktline_test.go similarity index 94% rename from internal/git/pktline/pkt_line_test.go rename to internal/git/pktline/pktline_test.go index 32694a7e0ab..dc29d3b0e0f 100644 --- a/internal/git/pktline/pkt_line_test.go +++ b/internal/git/pktline/pktline_test.go @@ -279,16 +279,23 @@ func TestEachSidebandPacket(t *testing.T) { }{ { desc: "empty", + in: "0000", out: map[byte]string{}, }, { desc: "empty with failing callback: callback does not run", + in: "0000", out: map[byte]string{}, callback: func(byte, []byte) error { panic("oh no") }, }, { desc: "valid stream", - in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz", + in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz0000", + out: map[byte]string{0: "foo", 1: "bar", 254: "qux", 255: "baz"}, + }, + { + desc: "valid stream trailing garbage", + in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz0000 garbage!!", out: map[byte]string{0: "foo", 1: "bar", 254: "qux", 255: "baz"}, }, { @@ -297,6 +304,11 @@ func TestEachSidebandPacket(t *testing.T) { callback: func(byte, []byte) error { return callbackError }, err: callbackError, }, + { + desc: "valid stream except missing flush", + in: "0008\x00foo0008\x01bar0008\xfequx0008\xffbaz", + err: io.ErrUnexpectedEOF, + }, { desc: "interrupted stream", in: "ffff\x10hello world!!", diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index 82c396503b4..f1be31c1d18 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -204,6 +204,10 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb return fmt.Errorf("git-pack-objects: stderr: %q err: %w", stderrBuf.String(), err) } + if err := pktline.WriteFlush(w); err != nil { + return err + } + return nil } -- GitLab From 9fbfe7841ccd88549ee42e38243f7844e7521f5d Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Fri, 16 Jul 2021 16:05:31 +0000 Subject: [PATCH 4/7] Add pktline.SingleBandReader This is helps us receive a stdin stream over a socket, including stdin EOF, while the socket remains open. EOF is signalled by a Git flush packet "0000". --- internal/git/pktline/pktline.go | 37 +++++++++++++++++ internal/git/pktline/pktline_test.go | 59 ++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/internal/git/pktline/pktline.go b/internal/git/pktline/pktline.go index 6ccb9d46475..54cea3e81bb 100644 --- a/internal/git/pktline/pktline.go +++ b/internal/git/pktline/pktline.go @@ -10,6 +10,8 @@ import ( "io" "strconv" "sync" + + "gitlab.com/gitlab-org/gitaly/v14/streamio" ) const ( @@ -187,3 +189,38 @@ func EachSidebandPacket(r io.Reader, fn func(byte, []byte) error) error { return io.ErrUnexpectedEOF } + +// SingleBandReader unwraps a flush-terminated sideband-64k stream. It +// expects a sequence of sideband packets all for the same band. The +// returned reader will return EOF when it encounters a flush packet. +// Anything else in the input stream will result in a read error. +func SingleBandReader(r io.Reader, band byte) io.Reader { + scanner := NewScanner(r) + + return streamio.NewReader(func() ([]byte, error) { + if !scanner.Scan() { + return nil, io.ErrUnexpectedEOF + } + data := scanner.Bytes() + + if IsFlush(data) { + return nil, io.EOF + } + + if len(data) < 5 { + return nil, &errNotSideband{string(data)} + } + + if b := data[4]; b != band { + return nil, errUnexpectedSideband(b) + } + + return data[5:], nil + }) +} + +type errUnexpectedSideband byte + +func (b errUnexpectedSideband) Error() string { + return fmt.Sprintf("unexpected band: %d", b) +} diff --git a/internal/git/pktline/pktline_test.go b/internal/git/pktline/pktline_test.go index dc29d3b0e0f..dd26ab875ce 100644 --- a/internal/git/pktline/pktline_test.go +++ b/internal/git/pktline/pktline_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "io" + "io/ioutil" "math" "math/rand" "strings" @@ -346,3 +347,61 @@ func TestEachSidebandPacket(t *testing.T) { }) } } + +func TestSingleBandReader(t *testing.T) { + testCases := []struct { + desc string + in string + out string + err error + }{ + { + desc: "empty", + in: "0000", + out: "", + }, + { + desc: "valid stream", + in: "0008\x00foo0008\x00bar0008\x00qux0008\x00baz0000", + out: "foobarquxbaz", + }, + { + desc: "valid stream trailing garbage", + in: "0008\x00foo0008\x00bar0008\x00qux0008\x00baz0000 garbage!!", + out: "foobarquxbaz", + }, + { + desc: "valid stream except missing flush", + in: "0008\x00foo0008\x00bar0008\x00qux0008\x00baz", + err: io.ErrUnexpectedEOF, + }, + { + desc: "interrupted stream", + in: "ffff\x00hello world!!", + err: io.ErrUnexpectedEOF, + }, + { + desc: "stream without band", + in: "0004", + err: &errNotSideband{pkt: "0004"}, + }, + { + desc: "stream with wrong band", + in: "0005\x01", + err: errUnexpectedSideband(1), + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + out, err := ioutil.ReadAll(SingleBandReader(strings.NewReader(tc.in), 0)) + if tc.err != nil { + require.Equal(t, tc.err, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc.out, string(out)) + }) + } +} -- GitLab From 4e1b3b8774981991903118eabd792f67eee31a97 Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Fri, 16 Jul 2021 16:08:39 +0000 Subject: [PATCH 5/7] Add PackObjectsHookStream proto definition --- proto/go/gitalypb/hook.pb.go | 416 ++++++++++++++++---------- proto/go/gitalypb/hook_grpc.pb.go | 40 ++- proto/hook.proto | 12 + ruby/proto/gitaly/hook_pb.rb | 6 + ruby/proto/gitaly/hook_services_pb.rb | 1 + 5 files changed, 310 insertions(+), 165 deletions(-) diff --git a/proto/go/gitalypb/hook.pb.go b/proto/go/gitalypb/hook.pb.go index a25bf025484..080765af7ce 100644 --- a/proto/go/gitalypb/hook.pb.go +++ b/proto/go/gitalypb/hook.pb.go @@ -9,6 +9,7 @@ package gitalypb import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" reflect "reflect" sync "sync" ) @@ -735,149 +736,219 @@ func (x *PackObjectsHookResponse) GetStderr() []byte { return nil } +type PackObjectsHookStreamRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"` + // args contains the arguments passed to the pack-objects hook, without the leading "git" + Args []string `protobuf:"bytes,2,rep,name=args,proto3" json:"args,omitempty"` +} + +func (x *PackObjectsHookStreamRequest) Reset() { + *x = PackObjectsHookStreamRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_hook_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PackObjectsHookStreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PackObjectsHookStreamRequest) ProtoMessage() {} + +func (x *PackObjectsHookStreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_hook_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PackObjectsHookStreamRequest.ProtoReflect.Descriptor instead. +func (*PackObjectsHookStreamRequest) Descriptor() ([]byte, []int) { + return file_hook_proto_rawDescGZIP(), []int{10} +} + +func (x *PackObjectsHookStreamRequest) GetRepository() *Repository { + if x != nil { + return x.Repository + } + return nil +} + +func (x *PackObjectsHookStreamRequest) GetArgs() []string { + if x != nil { + return x.Args + } + return nil +} + var File_hook_proto protoreflect.FileDescriptor var file_hook_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x68, 0x6f, 0x6f, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x1a, 0x0a, 0x6c, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc6, - 0x01, 0x0a, 0x15, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, - 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, - 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, - 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, - 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, - 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x28, 0x0a, - 0x10, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x75, 0x73, 0x68, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x67, 0x69, 0x74, 0x50, 0x75, 0x73, 0x68, - 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x7d, 0x0a, 0x16, 0x50, 0x72, 0x65, 0x52, 0x65, - 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, - 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, - 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0xc7, 0x01, 0x0a, 0x16, 0x50, 0x6f, 0x73, 0x74, 0x52, - 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, - 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, - 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, - 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, - 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, - 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, - 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x75, - 0x73, 0x68, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, - 0x52, 0x0e, 0x67, 0x69, 0x74, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x22, 0x7e, 0x0a, 0x17, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, - 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, - 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, - 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, - 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x22, 0xce, 0x01, 0x0a, 0x11, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, - 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, - 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, - 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, - 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x72, 0x65, 0x66, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x03, 0x72, 0x65, 0x66, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x6c, 0x64, 0x5f, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x6c, 0x64, 0x56, - 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x56, 0x61, 0x6c, 0x75, - 0x65, 0x22, 0x79, 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, - 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, - 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, - 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x9e, 0x02, 0x0a, - 0x1f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, - 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, - 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, - 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, - 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, - 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, - 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, - 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x43, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, + 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, + 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc6, 0x01, 0x0a, 0x15, + 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, + 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, + 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, + 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, + 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, + 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, + 0x62, 0x6c, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x67, 0x69, + 0x74, 0x5f, 0x70, 0x75, 0x73, 0x68, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x05, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x67, 0x69, 0x74, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x7d, 0x0a, 0x16, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, + 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, + 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, + 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, + 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, + 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x22, 0xc7, 0x01, 0x0a, 0x16, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, + 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, + 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, + 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, + 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, + 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x14, 0x0a, + 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, + 0x64, 0x69, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x75, 0x73, 0x68, 0x5f, + 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x67, + 0x69, 0x74, 0x50, 0x75, 0x73, 0x68, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x7e, 0x0a, + 0x17, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, + 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, + 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, + 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0xce, 0x01, + 0x0a, 0x11, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, + 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, + 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, + 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, + 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, + 0x65, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x72, 0x65, 0x66, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x03, 0x72, 0x65, 0x66, 0x12, 0x1b, 0x0a, 0x09, 0x6f, 0x6c, 0x64, 0x5f, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6f, 0x6c, 0x64, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x79, + 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, + 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, + 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, + 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x9e, 0x02, 0x0a, 0x1f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x31, 0x0a, 0x05, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x52, 0x45, 0x50, 0x41, 0x52, 0x45, 0x44, 0x10, - 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x54, 0x45, 0x44, 0x10, 0x01, - 0x12, 0x0b, 0x0a, 0x07, 0x41, 0x42, 0x4f, 0x52, 0x54, 0x45, 0x44, 0x10, 0x02, 0x22, 0x87, 0x01, - 0x0a, 0x20, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, - 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, - 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, - 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x7c, 0x0a, 0x16, 0x50, 0x61, 0x63, 0x6b, 0x4f, - 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, - 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, - 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x61, - 0x72, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, - 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, - 0x73, 0x74, 0x64, 0x69, 0x6e, 0x22, 0x49, 0x0a, 0x17, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, - 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, - 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, - 0x32, 0xf4, 0x03, 0x0a, 0x0b, 0x48, 0x6f, 0x6f, 0x6b, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x5b, 0x0a, 0x0e, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, - 0x6f, 0x6b, 0x12, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x72, 0x65, 0x52, - 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x72, 0x65, 0x52, 0x65, - 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5e, 0x0a, - 0x0f, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, - 0x12, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, - 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, - 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4d, 0x0a, - 0x0a, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x19, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x30, 0x01, 0x12, 0x79, 0x0a, 0x18, - 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x27, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x28, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, + 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, + 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x33, 0x0a, 0x15, 0x65, 0x6e, 0x76, 0x69, 0x72, + 0x6f, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x65, 0x6e, 0x76, 0x69, 0x72, 0x6f, 0x6e, 0x6d, + 0x65, 0x6e, 0x74, 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x14, 0x0a, 0x05, + 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, + 0x69, 0x6e, 0x12, 0x43, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x2d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, - 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, - 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5e, 0x0a, 0x0f, 0x50, 0x61, 0x63, 0x6b, 0x4f, - 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1e, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, - 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, - 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, - 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, - 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x31, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x52, 0x45, 0x50, 0x41, 0x52, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, + 0x0a, 0x09, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, + 0x07, 0x41, 0x42, 0x4f, 0x52, 0x54, 0x45, 0x44, 0x10, 0x02, 0x22, 0x87, 0x01, 0x0a, 0x20, 0x52, + 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, + 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, + 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, + 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x22, 0x7c, 0x0a, 0x16, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, + 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, + 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, + 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, 0x14, 0x0a, 0x05, + 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, + 0x69, 0x6e, 0x22, 0x49, 0x0a, 0x17, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, + 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, + 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, + 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x22, 0x6c, 0x0a, + 0x1c, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, + 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x32, 0xd3, 0x04, 0x0a, 0x0b, + 0x48, 0x6f, 0x6f, 0x6b, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5b, 0x0a, 0x0e, 0x50, + 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1d, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, + 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x72, 0x65, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, + 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5e, 0x0a, 0x0f, 0x50, 0x6f, 0x73, 0x74, + 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1e, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, + 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4d, 0x0a, 0x0a, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, + 0x97, 0x28, 0x02, 0x08, 0x02, 0x30, 0x01, 0x12, 0x79, 0x0a, 0x18, 0x52, 0x65, 0x66, 0x65, 0x72, + 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, + 0x6f, 0x6f, 0x6b, 0x12, 0x27, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x66, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, + 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, + 0x30, 0x01, 0x12, 0x5e, 0x0a, 0x0f, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, + 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x12, 0x1e, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, + 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, + 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, + 0x30, 0x01, 0x12, 0x5d, 0x0a, 0x15, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, + 0x73, 0x48, 0x6f, 0x6f, 0x6b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x24, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, + 0x48, 0x6f, 0x6f, 0x6b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, + 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -893,7 +964,7 @@ func file_hook_proto_rawDescGZIP() []byte { } var file_hook_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_hook_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_hook_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_hook_proto_goTypes = []interface{}{ (ReferenceTransactionHookRequest_State)(0), // 0: gitaly.ReferenceTransactionHookRequest.State (*PreReceiveHookRequest)(nil), // 1: gitaly.PreReceiveHookRequest @@ -906,35 +977,40 @@ var file_hook_proto_goTypes = []interface{}{ (*ReferenceTransactionHookResponse)(nil), // 8: gitaly.ReferenceTransactionHookResponse (*PackObjectsHookRequest)(nil), // 9: gitaly.PackObjectsHookRequest (*PackObjectsHookResponse)(nil), // 10: gitaly.PackObjectsHookResponse - (*Repository)(nil), // 11: gitaly.Repository - (*ExitStatus)(nil), // 12: gitaly.ExitStatus + (*PackObjectsHookStreamRequest)(nil), // 11: gitaly.PackObjectsHookStreamRequest + (*Repository)(nil), // 12: gitaly.Repository + (*ExitStatus)(nil), // 13: gitaly.ExitStatus + (*emptypb.Empty)(nil), // 14: google.protobuf.Empty } var file_hook_proto_depIdxs = []int32{ - 11, // 0: gitaly.PreReceiveHookRequest.repository:type_name -> gitaly.Repository - 12, // 1: gitaly.PreReceiveHookResponse.exit_status:type_name -> gitaly.ExitStatus - 11, // 2: gitaly.PostReceiveHookRequest.repository:type_name -> gitaly.Repository - 12, // 3: gitaly.PostReceiveHookResponse.exit_status:type_name -> gitaly.ExitStatus - 11, // 4: gitaly.UpdateHookRequest.repository:type_name -> gitaly.Repository - 12, // 5: gitaly.UpdateHookResponse.exit_status:type_name -> gitaly.ExitStatus - 11, // 6: gitaly.ReferenceTransactionHookRequest.repository:type_name -> gitaly.Repository + 12, // 0: gitaly.PreReceiveHookRequest.repository:type_name -> gitaly.Repository + 13, // 1: gitaly.PreReceiveHookResponse.exit_status:type_name -> gitaly.ExitStatus + 12, // 2: gitaly.PostReceiveHookRequest.repository:type_name -> gitaly.Repository + 13, // 3: gitaly.PostReceiveHookResponse.exit_status:type_name -> gitaly.ExitStatus + 12, // 4: gitaly.UpdateHookRequest.repository:type_name -> gitaly.Repository + 13, // 5: gitaly.UpdateHookResponse.exit_status:type_name -> gitaly.ExitStatus + 12, // 6: gitaly.ReferenceTransactionHookRequest.repository:type_name -> gitaly.Repository 0, // 7: gitaly.ReferenceTransactionHookRequest.state:type_name -> gitaly.ReferenceTransactionHookRequest.State - 12, // 8: gitaly.ReferenceTransactionHookResponse.exit_status:type_name -> gitaly.ExitStatus - 11, // 9: gitaly.PackObjectsHookRequest.repository:type_name -> gitaly.Repository - 1, // 10: gitaly.HookService.PreReceiveHook:input_type -> gitaly.PreReceiveHookRequest - 3, // 11: gitaly.HookService.PostReceiveHook:input_type -> gitaly.PostReceiveHookRequest - 5, // 12: gitaly.HookService.UpdateHook:input_type -> gitaly.UpdateHookRequest - 7, // 13: gitaly.HookService.ReferenceTransactionHook:input_type -> gitaly.ReferenceTransactionHookRequest - 9, // 14: gitaly.HookService.PackObjectsHook:input_type -> gitaly.PackObjectsHookRequest - 2, // 15: gitaly.HookService.PreReceiveHook:output_type -> gitaly.PreReceiveHookResponse - 4, // 16: gitaly.HookService.PostReceiveHook:output_type -> gitaly.PostReceiveHookResponse - 6, // 17: gitaly.HookService.UpdateHook:output_type -> gitaly.UpdateHookResponse - 8, // 18: gitaly.HookService.ReferenceTransactionHook:output_type -> gitaly.ReferenceTransactionHookResponse - 10, // 19: gitaly.HookService.PackObjectsHook:output_type -> gitaly.PackObjectsHookResponse - 15, // [15:20] is the sub-list for method output_type - 10, // [10:15] is the sub-list for method input_type - 10, // [10:10] is the sub-list for extension type_name - 10, // [10:10] is the sub-list for extension extendee - 0, // [0:10] is the sub-list for field type_name + 13, // 8: gitaly.ReferenceTransactionHookResponse.exit_status:type_name -> gitaly.ExitStatus + 12, // 9: gitaly.PackObjectsHookRequest.repository:type_name -> gitaly.Repository + 12, // 10: gitaly.PackObjectsHookStreamRequest.repository:type_name -> gitaly.Repository + 1, // 11: gitaly.HookService.PreReceiveHook:input_type -> gitaly.PreReceiveHookRequest + 3, // 12: gitaly.HookService.PostReceiveHook:input_type -> gitaly.PostReceiveHookRequest + 5, // 13: gitaly.HookService.UpdateHook:input_type -> gitaly.UpdateHookRequest + 7, // 14: gitaly.HookService.ReferenceTransactionHook:input_type -> gitaly.ReferenceTransactionHookRequest + 9, // 15: gitaly.HookService.PackObjectsHook:input_type -> gitaly.PackObjectsHookRequest + 11, // 16: gitaly.HookService.PackObjectsHookStream:input_type -> gitaly.PackObjectsHookStreamRequest + 2, // 17: gitaly.HookService.PreReceiveHook:output_type -> gitaly.PreReceiveHookResponse + 4, // 18: gitaly.HookService.PostReceiveHook:output_type -> gitaly.PostReceiveHookResponse + 6, // 19: gitaly.HookService.UpdateHook:output_type -> gitaly.UpdateHookResponse + 8, // 20: gitaly.HookService.ReferenceTransactionHook:output_type -> gitaly.ReferenceTransactionHookResponse + 10, // 21: gitaly.HookService.PackObjectsHook:output_type -> gitaly.PackObjectsHookResponse + 14, // 22: gitaly.HookService.PackObjectsHookStream:output_type -> google.protobuf.Empty + 17, // [17:23] is the sub-list for method output_type + 11, // [11:17] is the sub-list for method input_type + 11, // [11:11] is the sub-list for extension type_name + 11, // [11:11] is the sub-list for extension extendee + 0, // [0:11] is the sub-list for field type_name } func init() { file_hook_proto_init() } @@ -1065,6 +1141,18 @@ func file_hook_proto_init() { return nil } } + file_hook_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PackObjectsHookStreamRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1072,7 +1160,7 @@ func file_hook_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_hook_proto_rawDesc, NumEnums: 1, - NumMessages: 10, + NumMessages: 11, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/go/gitalypb/hook_grpc.pb.go b/proto/go/gitalypb/hook_grpc.pb.go index 7efe8249f25..a494223ecd2 100644 --- a/proto/go/gitalypb/hook_grpc.pb.go +++ b/proto/go/gitalypb/hook_grpc.pb.go @@ -7,6 +7,7 @@ import ( grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" ) // This is a compile-time assertion to ensure that this generated file @@ -26,6 +27,7 @@ type HookServiceClient interface { // uploadpack.packObjectsHook mechanism. It generates a stream of packed // Git objects. PackObjectsHook(ctx context.Context, opts ...grpc.CallOption) (HookService_PackObjectsHookClient, error) + PackObjectsHookStream(ctx context.Context, in *PackObjectsHookStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type hookServiceClient struct { @@ -192,6 +194,15 @@ func (x *hookServicePackObjectsHookClient) Recv() (*PackObjectsHookResponse, err return m, nil } +func (c *hookServiceClient) PackObjectsHookStream(ctx context.Context, in *PackObjectsHookStreamRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/gitaly.HookService/PackObjectsHookStream", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // HookServiceServer is the server API for HookService service. // All implementations must embed UnimplementedHookServiceServer // for forward compatibility @@ -204,6 +215,7 @@ type HookServiceServer interface { // uploadpack.packObjectsHook mechanism. It generates a stream of packed // Git objects. PackObjectsHook(HookService_PackObjectsHookServer) error + PackObjectsHookStream(context.Context, *PackObjectsHookStreamRequest) (*emptypb.Empty, error) mustEmbedUnimplementedHookServiceServer() } @@ -226,6 +238,9 @@ func (UnimplementedHookServiceServer) ReferenceTransactionHook(HookService_Refer func (UnimplementedHookServiceServer) PackObjectsHook(HookService_PackObjectsHookServer) error { return status.Errorf(codes.Unimplemented, "method PackObjectsHook not implemented") } +func (UnimplementedHookServiceServer) PackObjectsHookStream(context.Context, *PackObjectsHookStreamRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method PackObjectsHookStream not implemented") +} func (UnimplementedHookServiceServer) mustEmbedUnimplementedHookServiceServer() {} // UnsafeHookServiceServer may be embedded to opt out of forward compatibility for this service. @@ -364,13 +379,36 @@ func (x *hookServicePackObjectsHookServer) Recv() (*PackObjectsHookRequest, erro return m, nil } +func _HookService_PackObjectsHookStream_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PackObjectsHookStreamRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HookServiceServer).PackObjectsHookStream(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gitaly.HookService/PackObjectsHookStream", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HookServiceServer).PackObjectsHookStream(ctx, req.(*PackObjectsHookStreamRequest)) + } + return interceptor(ctx, in, info, handler) +} + // HookService_ServiceDesc is the grpc.ServiceDesc for HookService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var HookService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "gitaly.HookService", HandlerType: (*HookServiceServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "PackObjectsHookStream", + Handler: _HookService_PackObjectsHookStream_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "PreReceiveHook", diff --git a/proto/hook.proto b/proto/hook.proto index 71dd75cd64b..efde832be47 100644 --- a/proto/hook.proto +++ b/proto/hook.proto @@ -6,6 +6,7 @@ option go_package = "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"; import "lint.proto"; import "shared.proto"; +import "google/protobuf/empty.proto"; service HookService { rpc PreReceiveHook(stream PreReceiveHookRequest) returns (stream PreReceiveHookResponse) { @@ -36,6 +37,11 @@ service HookService { op: ACCESSOR }; } + rpc PackObjectsHookStream(PackObjectsHookStreamRequest) returns (google.protobuf.Empty) { + option (op_type) = { + op: ACCESSOR + }; + } } message PreReceiveHookRequest { @@ -110,3 +116,9 @@ message PackObjectsHookResponse { // stderr contains progress messages (such as "Enumerating objects ...") bytes stderr = 2; } + +message PackObjectsHookStreamRequest { + Repository repository = 1 [(target_repository)=true]; + // args contains the arguments passed to the pack-objects hook, without the leading "git" + repeated string args = 2; +} diff --git a/ruby/proto/gitaly/hook_pb.rb b/ruby/proto/gitaly/hook_pb.rb index 20c8cb8b189..8a8d4b87cdb 100644 --- a/ruby/proto/gitaly/hook_pb.rb +++ b/ruby/proto/gitaly/hook_pb.rb @@ -5,6 +5,7 @@ require 'google/protobuf' require 'lint_pb' require 'shared_pb' +require 'google/protobuf/empty_pb' Google::Protobuf::DescriptorPool.generated_pool.build do add_file("hook.proto", :syntax => :proto3) do add_message "gitaly.PreReceiveHookRequest" do @@ -66,6 +67,10 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :stdout, :bytes, 1 optional :stderr, :bytes, 2 end + add_message "gitaly.PackObjectsHookStreamRequest" do + optional :repository, :message, 1, "gitaly.Repository" + repeated :args, :string, 2 + end end end @@ -81,4 +86,5 @@ module Gitaly ReferenceTransactionHookResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.ReferenceTransactionHookResponse").msgclass PackObjectsHookRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.PackObjectsHookRequest").msgclass PackObjectsHookResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.PackObjectsHookResponse").msgclass + PackObjectsHookStreamRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.PackObjectsHookStreamRequest").msgclass end diff --git a/ruby/proto/gitaly/hook_services_pb.rb b/ruby/proto/gitaly/hook_services_pb.rb index 196ef344b52..66c6ea4a52c 100644 --- a/ruby/proto/gitaly/hook_services_pb.rb +++ b/ruby/proto/gitaly/hook_services_pb.rb @@ -22,6 +22,7 @@ module Gitaly # uploadpack.packObjectsHook mechanism. It generates a stream of packed # Git objects. rpc :PackObjectsHook, stream(Gitaly::PackObjectsHookRequest), stream(Gitaly::PackObjectsHookResponse) + rpc :PackObjectsHookStream, Gitaly::PackObjectsHookStreamRequest, Google::Protobuf::Empty end Stub = Service.rpc_stub_class -- GitLab From 459c107cf6a0e12efca231f10508cd49be34450f Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Fri, 16 Jul 2021 16:11:25 +0000 Subject: [PATCH 6/7] Add PackObjectsHookStream RPC Changelog: added --- internal/gitaly/service/hook/pack_objects.go | 90 +++++--- .../gitaly/service/hook/pack_objects_test.go | 200 +++++++++++++++++- 2 files changed, 256 insertions(+), 34 deletions(-) diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index f1be31c1d18..753c2f7c939 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -14,6 +14,7 @@ import ( "strings" "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -21,8 +22,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" + "google.golang.org/protobuf/types/known/emptypb" ) var ( @@ -59,7 +62,32 @@ func (s *server) PackObjectsHook(stream gitalypb.HookService_PackObjectsHookServ return helper.ErrInvalidArgumentf("invalid pack-objects command: %v: %w", firstRequest.Args, err) } - if err := s.packObjectsHook(stream, firstRequest, args); err != nil { + stdin := streamio.NewReader(func() ([]byte, error) { + resp, err := stream.Recv() + return resp.GetStdin(), err + }) + + output := func(r io.Reader) (int64, error) { + var n int64 + err := pktline.EachSidebandPacket(r, func(band byte, data []byte) error { + resp := &gitalypb.PackObjectsHookResponse{} + + switch band { + case bandStdout: + resp.Stdout = data + case bandStderr: + resp.Stderr = data + default: + return fmt.Errorf("invalid side band: %d", band) + } + + n += int64(len(data)) + return stream.Send(resp) + }) + return n, err + } + + if err := s.packObjectsHook(stream.Context(), firstRequest.Repository, firstRequest, args, stdin, output); err != nil { return helper.ErrInternal(err) } @@ -67,19 +95,18 @@ func (s *server) PackObjectsHook(stream gitalypb.HookService_PackObjectsHookServ } const ( + bandStdin = 0 bandStdout = 1 bandStderr = 2 ) -func (s *server) packObjectsHook(stream gitalypb.HookService_PackObjectsHookServer, firstRequest *gitalypb.PackObjectsHookRequest, args *packObjectsArgs) error { - ctx := stream.Context() - +func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository, reqHash proto.Message, args *packObjectsArgs, stdinReader io.Reader, output func(io.Reader) (int64, error)) error { h := sha256.New() - if err := (&jsonpb.Marshaler{}).Marshal(h, firstRequest); err != nil { + if err := (&jsonpb.Marshaler{}).Marshal(h, reqHash); err != nil { return err } - stdin, err := bufferStdin(stream, h) + stdin, err := bufferStdin(stdinReader, h) if err != nil { return err } @@ -99,7 +126,7 @@ func (s *server) packObjectsHook(stream gitalypb.HookService_PackObjectsHookServ key := hex.EncodeToString(h.Sum(nil)) r, created, err := s.packObjectsCache.FindOrCreate(key, func(w io.Writer) error { - return s.runPackObjects(ctx, w, firstRequest.Repository, args, stdin, key) + return s.runPackObjects(ctx, w, repo, args, stdin, key) }) if err != nil { return err @@ -122,21 +149,8 @@ func (s *server) packObjectsHook(stream gitalypb.HookService_PackObjectsHookServ packObjectsServedBytes.Add(float64(servedBytes)) }() - if err := pktline.EachSidebandPacket(r, func(band byte, data []byte) error { - resp := &gitalypb.PackObjectsHookResponse{} - - switch band { - case bandStdout: - resp.Stdout = data - case bandStderr: - resp.Stderr = data - default: - return fmt.Errorf("invalid side band: %d", band) - } - - servedBytes += int64(len(data)) - return stream.Send(resp) - }); err != nil { + servedBytes, err = output(r) + if err != nil { return err } @@ -278,7 +292,7 @@ func (p *packObjectsArgs) subcmd() git.SubCmd { return sc } -func bufferStdin(stream gitalypb.HookService_PackObjectsHookServer, h hash.Hash) (_ io.ReadCloser, err error) { +func bufferStdin(r io.Reader, h hash.Hash) (_ io.ReadCloser, err error) { f, err := ioutil.TempFile("", "PackObjectsHook-stdin") if err != nil { return nil, err @@ -293,15 +307,7 @@ func bufferStdin(stream gitalypb.HookService_PackObjectsHookServer, h hash.Hash) return nil, err } - stdin := io.TeeReader( - streamio.NewReader(func() ([]byte, error) { - resp, err := stream.Recv() - return resp.GetStdin(), err - }), - h, - ) - - _, err = io.Copy(f, stdin) + _, err = io.Copy(f, io.TeeReader(r, h)) if err != nil { return nil, err } @@ -323,3 +329,23 @@ func (cw *countingWriter) Write(p []byte) (int, error) { cw.N += int64(n) return n, err } + +func (s *server) PackObjectsHookStream(ctx context.Context, req *gitalypb.PackObjectsHookStreamRequest) (*emptypb.Empty, error) { + if req.GetRepository() == nil { + return nil, helper.ErrInvalidArgument(errors.New("repository is empty")) + } + + args, err := parsePackObjectsArgs(req.Args) + if err != nil { + return nil, helper.ErrInvalidArgumentf("invalid pack-objects command: %v: %w", req.Args, err) + } + + c, err := streamrpc.AcceptConnection(ctx) + if err != nil { + return nil, err + } + + stdin := pktline.SingleBandReader(c, bandStdin) + output := func(r io.Reader) (int64, error) { return io.Copy(c, r) } + return nil, s.packObjectsHook(ctx, req.Repository, req, args, stdin, output) +} diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index b6d3c987922..7fd0bbb76b9 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "net" "testing" "time" @@ -11,8 +12,10 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/streamcache" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -228,9 +231,12 @@ func TestServer_PackObjectsHook_separateContext(t *testing.T) { func TestServer_PackObjectsHook_usesCache(t *testing.T) { cfg, repo, repoPath := cfgWithCache(t) - tlc := &streamcache.TestLoggingCache{} + tlc := &streamcache.TestLoggingCache{Cache: streamcache.New( + cfg.PackObjectsCache.Dir, + cfg.PackObjectsCache.MaxAge.Duration(), + testhelper.DiscardTestLogger(t), + )} serverSocketPath := runHooksServer(t, cfg, []serverOption{func(s *server) { - tlc.Cache = s.packObjectsCache s.packObjectsCache = tlc }}) @@ -288,3 +294,193 @@ func TestServer_PackObjectsHook_usesCache(t *testing.T) { require.NoError(t, entries[i].Err) } } + +func TestServer_PackObjectsHookStream(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + cfg, repo, repoPath := cfgWithCache(t) + + testCases := []struct { + desc string + stdin string + args []string + }{ + { + desc: "clone 1 branch", + stdin: "3dd08961455abf80ef9115f4afdc1c6f968b503c\n--not\n\n", + args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}, + }, + { + desc: "shallow clone 1 branch", + stdin: "--shallow 1e292f8fedd741b75372e19097c76d327140c312\n1e292f8fedd741b75372e19097c76d327140c312\n--not\n\n", + args: []string{"--shallow-file", "", "pack-objects", "--revs", "--thin", "--stdout", "--shallow", "--progress", "--delta-base-offset", "--include-tag"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + logger, hook := test.NewNullLogger() + serverSocketPath := runHooksServer(t, cfg, nil, + testserver.WithLogger(logger), + testserver.WithDisablePraefect(), // TODO remove after https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1127 + ) + + var packets []string + require.NoError(t, streamrpc.Call( + ctx, + streamrpc.DialNet(serverSocketPath), + "/gitaly.HookService/PackObjectsHookStream", + &gitalypb.PackObjectsHookStreamRequest{ + Repository: repo, + Args: tc.args, + }, + func(c net.Conn) error { + sw := pktline.NewSidebandWriter(c) + if _, err := io.WriteString(sw.Writer(0), tc.stdin); err != nil { + return err + } + if err := pktline.WriteFlush(c); err != nil { + return err + } + + scanner := pktline.NewScanner(c) + for scanner.Scan() { + packets = append(packets, scanner.Text()) + } + return scanner.Err() + }, + )) + + require.NotEmpty(t, packets) + last := len(packets) - 1 + require.Equal(t, "0000", packets[last]) + + var packdata []byte + for _, pkt := range packets[:last] { + if len(pkt) < 5 { + t.Fatalf("invalid packet: %q", pkt) + } + + switch band := pkt[4]; band { + case 1: + packdata = append(packdata, pkt[5:]...) + case 2: + default: + t.Fatalf("unexpected band: %d", band) + } + } + + gittest.ExecStream( + t, + cfg, + bytes.NewReader(packdata), + "-C", repoPath, "index-pack", "--stdin", "--fix-thin", + ) + + for _, msg := range []string{"served bytes", "generated bytes"} { + t.Run(msg, func(t *testing.T) { + var entry *logrus.Entry + for _, e := range hook.AllEntries() { + if e.Message == msg { + entry = e + } + } + + require.NotNil(t, entry) + require.NotEmpty(t, entry.Data["cache_key"]) + require.Greater(t, entry.Data["bytes"], int64(0)) + }) + } + }) + } +} + +func TestServer_PackObjectsHookStream_errorSuppressesFlush(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + cfg, repo, _ := cfgWithCache(t) + serverSocketPath := runHooksServer(t, cfg, nil, + testserver.WithDisablePraefect(), // TODO remove after https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1127 + ) + + var packets []string + require.NoError(t, streamrpc.Call( + ctx, + streamrpc.DialNet(serverSocketPath), + "/gitaly.HookService/PackObjectsHookStream", + &gitalypb.PackObjectsHookStreamRequest{ + Repository: repo, + Args: []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"}, + }, + func(c net.Conn) error { + // Send input that will cause git pack-objects to fail + if _, err := pktline.WriteString(c, "\x00bad request"); err != nil { + return err + } + if err := pktline.WriteFlush(c); err != nil { + return err + } + + scanner := pktline.NewScanner(c) + for scanner.Scan() { + packets = append(packets, scanner.Text()) + } + return scanner.Err() + }, + )) + + // Because git pack-objects failed, there should be no flush packet. + require.NotEmpty(t, packets) + for _, pkt := range packets { + require.False(t, pktline.IsFlush([]byte(pkt))) + } +} + +func TestServer_PackObjectsHookStream_invalidArgument(t *testing.T) { + cfg, repo, _ := testcfg.BuildWithRepo(t) + serverSocketPath := runHooksServer(t, cfg, nil, + testserver.WithDisablePraefect(), // TODO remove after https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1127 + ) + + ctx, cancel := testhelper.Context() + defer cancel() + + testCases := []struct { + desc string + req *gitalypb.PackObjectsHookStreamRequest + msg string + }{ + { + desc: "empty", + req: &gitalypb.PackObjectsHookStreamRequest{}, + msg: "repository is empty", + }, + { + desc: "repo, no args", + req: &gitalypb.PackObjectsHookStreamRequest{Repository: repo}, + msg: "invalid pack-objects command", + }, + { + desc: "repo, bad args", + req: &gitalypb.PackObjectsHookStreamRequest{Repository: repo, Args: []string{"rm", "-rf"}}, + msg: "invalid pack-objects command", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + err := streamrpc.Call( + ctx, + streamrpc.DialNet(serverSocketPath), + "/gitaly.HookService/PackObjectsHookStream", + tc.req, + func(c net.Conn) error { panic("never reached") }, + ) + + require.Error(t, err) + require.Contains(t, err.Error(), tc.msg) + }) + } +} -- GitLab From 40caf51e4f2350d729432dad501545144ef5625d Mon Sep 17 00:00:00 2001 From: Jacob Vosmaer Date: Tue, 20 Jul 2021 17:26:31 +0000 Subject: [PATCH 7/7] Add PackObjectsHookStream client This is feature-flagged and won't be called by default. Changelog: other --- cmd/gitaly-hooks/hooks.go | 60 ++++++++++++++++++- cmd/gitaly-hooks/hooks_test.go | 46 +++++++++++--- internal/git/hooks_options.go | 4 ++ .../metadata/featureflag/feature_flags.go | 3 + 4 files changed, 101 insertions(+), 12 deletions(-) diff --git a/cmd/gitaly-hooks/hooks.go b/cmd/gitaly-hooks/hooks.go index 03b33ddb8a6..1b21a4d46cd 100644 --- a/cmd/gitaly-hooks/hooks.go +++ b/cmd/gitaly-hooks/hooks.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "net" "os" "strings" @@ -13,6 +14,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" @@ -21,6 +23,7 @@ import ( gitalylog "gitlab.com/gitlab-org/gitaly/v14/internal/log" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/stream" + "gitlab.com/gitlab-org/gitaly/v14/internal/streamrpc" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" "gitlab.com/gitlab-org/labkit/tracing" @@ -339,9 +342,17 @@ func packObjectsHook(ctx context.Context, payload git.HooksPayload, hookClient g fixedArgs = append(fixedArgs, fixFilterQuoteBug(a)) } - if err := handlePackObjects(ctx, hookClient, payload.Repo, fixedArgs); err != nil { - logger.Logger().WithFields(logrus.Fields{"args": args}).WithError(err).Error("PackObjectsHook RPC failed") - return 1, nil + switch os.Getenv("GITALY_HOOKS_PACK_OBJECTS_HOOK_STREAM") { + case "1": + if err := handlePackObjectsStream(ctx, payload, fixedArgs); err != nil { + logger.Logger().WithFields(logrus.Fields{"args": args}).WithError(err).Error("PackObjectsHookStream RPC failed") + return 1, nil + } + default: + if err := handlePackObjects(ctx, hookClient, payload.Repo, fixedArgs); err != nil { + logger.Logger().WithFields(logrus.Fields{"args": args}).WithError(err).Error("PackObjectsHook RPC failed") + return 1, nil + } } return 0, nil @@ -406,3 +417,46 @@ type nopExitStatus struct { } func (nopExitStatus) GetExitStatus() *gitalypb.ExitStatus { return nil } + +func handlePackObjectsStream(ctx context.Context, payload git.HooksPayload, args []string) error { + req := &gitalypb.PackObjectsHookStreamRequest{ + Repository: payload.Repo, + Args: args, + } + + callback := func(c net.Conn) error { + if _, err := io.Copy( + pktline.NewSidebandWriter(c).Writer(0), + os.Stdin, + ); err != nil { + return err + } + if err := pktline.WriteFlush(c); err != nil { + return err + } + + return pktline.EachSidebandPacket(c, func(band byte, data []byte) error { + var err error + switch band { + case 1: + _, err = os.Stdout.Write(data) + case 2: + _, err = os.Stderr.Write(data) + default: + err = fmt.Errorf("unexpected side band: %d", band) + } + return err + }) + } + + return streamrpc.Call( + ctx, + streamrpc.DialNet("unix://"+payload.InternalSocket), + "/gitaly.HookService/PackObjectsHookStream", + req, + callback, + streamrpc.WithCredentials( + gitalyauth.RPCCredentialsV2(payload.InternalSocketToken), + ), + ) +} diff --git a/cmd/gitaly-hooks/hooks_test.go b/cmd/gitaly-hooks/hooks_test.go index 424dd60a3e2..c5ccdadce4b 100644 --- a/cmd/gitaly-hooks/hooks_test.go +++ b/cmd/gitaly-hooks/hooks_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" + "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/command" @@ -33,6 +34,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" ) type glHookValues struct { @@ -567,8 +569,8 @@ func TestCheckBadCreds(t *testing.T) { require.Regexp(t, `Checking GitLab API access: .* level=error msg="Internal API error" .* error="authorization failed" method=GET status=401 url="http://127.0.0.1:[0-9]+/api/v4/internal/check"\nFAIL`, stdout.String()) } -func runHookServiceServer(t *testing.T, cfg config.Cfg) { - runHookServiceWithGitlabClient(t, cfg, gitlab.NewMockClient()) +func runHookServiceServer(t *testing.T, cfg config.Cfg, serverOpts ...testserver.GitalyServerOpt) { + runHookServiceWithGitlabClient(t, cfg, gitlab.NewMockClient(), serverOpts...) } type featureFlagAsserter struct { @@ -607,12 +609,18 @@ func (svc featureFlagAsserter) PackObjectsHook(stream gitalypb.HookService_PackO return svc.wrapped.PackObjectsHook(stream) } -func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client) { +func (svc featureFlagAsserter) PackObjectsHookStream(ctx context.Context, req *gitalypb.PackObjectsHookStreamRequest) (*emptypb.Empty, error) { + svc.assertFlags(ctx) + return svc.wrapped.PackObjectsHookStream(ctx, req) +} + +func runHookServiceWithGitlabClient(t *testing.T, cfg config.Cfg, gitlabClient gitlab.Client, serverOpts ...testserver.GitalyServerOpt) { + serverOpts = append(serverOpts, testserver.WithGitLabClient(gitlabClient)) testserver.RunGitalyServer(t, cfg, nil, func(srv grpc.ServiceRegistrar, deps *service.Dependencies) { gitalypb.RegisterHookServiceServer(srv, featureFlagAsserter{ t: t, wrapped: hook.NewServer(deps.GetCfg(), deps.GetHookManager(), deps.GetGitCmdFactory()), }) - }, testserver.WithGitLabClient(gitlabClient)) + }, serverOpts...) } func requireContainsOnce(t *testing.T, s string, contains string) { @@ -666,25 +674,45 @@ func TestGitalyHooksPackObjects(t *testing.T) { testCases := []struct { desc string extraArgs []string + extraEnv []string + method string }{ - {desc: "regular clone"}, - {desc: "shallow clone", extraArgs: []string{"--depth=1"}}, - {desc: "partial clone", extraArgs: []string{"--filter=blob:none"}}, + {desc: "regular clone", method: "PackObjectsHook"}, + {desc: "shallow clone", extraArgs: []string{"--depth=1"}, method: "PackObjectsHook"}, + {desc: "partial clone", extraArgs: []string{"--filter=blob:none"}, method: "PackObjectsHook"}, + { + desc: "regular clone StreamRPC", + extraEnv: []string{"GITALY_HOOKS_PACK_OBJECTS_HOOK_STREAM=1"}, + method: "PackObjectsHookStream", + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - runHookServiceServer(t, cfg) + logger, hook := test.NewNullLogger() + runHookServiceServer(t, cfg, testserver.WithLogger(logger)) tempDir := testhelper.TempDir(t) args := append(baseArgs[1:], tc.extraArgs...) args = append(args, repoPath, tempDir) cmd := exec.Command(baseArgs[0], args...) - cmd.Env = env + cmd.Env = append(env, tc.extraEnv...) cmd.Stderr = os.Stderr require.NoError(t, cmd.Run()) + + foundMethod := false + for _, e := range hook.AllEntries() { + t.Log(e.Data) + if e.Data["grpc.service"] != "gitaly.HookService" { + continue + } + + require.Equal(t, tc.method, e.Data["grpc.method"]) + foundMethod = true + } + require.True(t, foundMethod) }) } } diff --git a/internal/git/hooks_options.go b/internal/git/hooks_options.go index 7ad3ff9f70a..b8b2789046f 100644 --- a/internal/git/hooks_options.go +++ b/internal/git/hooks_options.go @@ -66,6 +66,10 @@ func WithPackObjectsHookEnv(ctx context.Context, repo *gitalypb.Repository, cfg Value: filepath.Join(cfg.BinDir, "gitaly-hooks"), }) + if featureflag.PackObjectsHookStream.IsEnabled(ctx) { + cc.env = append(cc.env, "GITALY_HOOKS_PACK_OBJECTS_HOOK_STREAM=1") + } + return nil } } diff --git a/internal/metadata/featureflag/feature_flags.go b/internal/metadata/featureflag/feature_flags.go index 8b9175f6678..55e0eb7206e 100644 --- a/internal/metadata/featureflag/feature_flags.go +++ b/internal/metadata/featureflag/feature_flags.go @@ -24,6 +24,8 @@ var ( FindAllTagsPipeline = FeatureFlag{Name: "find_all_tags_pipeline", OnByDefault: false} // TxRemoveRepository enables transactionsal voting for the RemoveRepository RPC. TxRemoveRepository = FeatureFlag{Name: "tx_remove_repository", OnByDefault: false} + // GitalyHooksPackObjectsHookStream enables StreamRPC in 'gitaly-hooks git pack-objects'. + PackObjectsHookStream = FeatureFlag{Name: "pack_objects_hook_stream", OnByDefault: false} ) // All includes all feature flags. @@ -36,4 +38,5 @@ var All = []FeatureFlag{ ReplicateRepositoryDirectFetch, FindAllTagsPipeline, TxRemoveRepository, + PackObjectsHookStream, } -- GitLab