diff --git a/changelogs/unreleased/jc-inject-gitaly-time.yml b/changelogs/unreleased/jc-inject-gitaly-time.yml new file mode 100644 index 0000000000000000000000000000000000000000..4f99d5fbdf0f19c2bf9adf8f304a67e5d31fbd2e --- /dev/null +++ b/changelogs/unreleased/jc-inject-gitaly-time.yml @@ -0,0 +1,5 @@ +--- +title: Add middleware package for capturing proxy time +merge_request: 1681 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index a0868f36d8b224fa202d456b865be2cb1c0f2672..28c7d4c13f0995c727a3c24f71b8d1695bd2451e 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -39,6 +39,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter" "gitlab.com/gitlab-org/gitaly/internal/config/sentry" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" @@ -113,6 +114,8 @@ func configure() (config.Config, error) { logger.WithField("address", conf.PrometheusListenAddr).Info("Starting prometheus listener") conf.Prometheus.Configure() + metrics.RegisterProxyTime(conf) + go func() { if err := monitoring.Serve( monitoring.WithListenerAddress(conf.PrometheusListenAddr), @@ -154,9 +157,10 @@ func run(cfgs []starter.Config, conf config.Config) error { } var ( + tt = proxytime.NewTrailerTracker() // top level server dependencies ds = datastore.NewInMemory(conf) - coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) + coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, tt, protoregistry.GitalyProtoFileDescriptors...) repl = praefect.NewReplMgr( "default", logger, @@ -164,7 +168,7 @@ func run(cfgs []starter.Config, conf config.Config) error { clientConnections, praefect.WithLatencyMetric(latencyMetric), praefect.WithQueueMetric(queueMetric)) - srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) + srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf, tt) serverErrors = make(chan error, 1) ) diff --git a/internal/middleware/proxytime/gitaly_time.go b/internal/middleware/proxytime/gitaly_time.go new file mode 100644 index 0000000000000000000000000000000000000000..af267a8fa00ac16fc3f758de5b949df7314ff3d8 --- /dev/null +++ b/internal/middleware/proxytime/gitaly_time.go @@ -0,0 +1,33 @@ +package proxytime + +import ( + "context" + "strconv" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// gitalyTimeTrailerKey is the key name for a trailer value representing the total time a request has been in gitaly +const gitalyTimeTrailerKey = "gitaly-time" + +// StreamGitalyTime is a gRPC server-side interceptor that sets a trailer with the total time spent in gitaly. +func StreamGitalyTime(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + startTime := time.Now() + defer func() { + ss.SetTrailer(metadata.New(map[string]string{gitalyTimeTrailerKey: strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 10, 64)})) + }() + + return handler(srv, ss) +} + +// UnaryGitalyTime is a gRPC server-side interceptor that sets a trailer with the total time spent in gitaly. +func UnaryGitalyTime(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + defer func() { + grpc.SetTrailer(ctx, metadata.Pairs(gitalyTimeTrailerKey, strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 10, 64))) + }() + + return handler(ctx, req) +} diff --git a/internal/middleware/proxytime/proxy.go b/internal/middleware/proxytime/proxy.go new file mode 100644 index 0000000000000000000000000000000000000000..0f4f8670244546b972ad0837b41b1c1b85e4e6a2 --- /dev/null +++ b/internal/middleware/proxytime/proxy.go @@ -0,0 +1,87 @@ +package proxytime + +import ( + "context" + "strconv" + "time" + + "github.com/google/uuid" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + // RequestIDKey is the key for a unique request id generated upon every rpc request that goes through praefect + RequestIDKey = "proxy-request-id" +) + +// Unary is a gRPC server-side interceptor that provides a prometheus metric for the latency praefect adds to every gitaly request. +func Unary(tracker *TrailerTracker) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + + requestID := uuid.New().String() + + resp, err := handler(appendToIncomingContext(ctx, metadata.Pairs(RequestIDKey, requestID)), req) + + trailer, trailerErr := tracker.RemoveTrailer(requestID) + if trailerErr != nil { + grpc_logrus.Extract(ctx).WithError(trailerErr).Error("error when getting trailer from tracker") + return resp, err + } + + gitalyTimeTrailer := trailer.Get(gitalyTimeTrailerKey) + if len(gitalyTimeTrailer) > 0 { + gitalyTime, gitalyTimeErr := strconv.ParseFloat(gitalyTimeTrailer[0], 64) + if gitalyTimeErr == nil { + praefectTime := time.Since(startTime) + metrics.ProxyTime.Observe((float64(praefectTime) - gitalyTime) / float64(time.Second)) + } + } + + return resp, err + } +} + +// Stream is a gRPC server-side interceptor that provides a prometheus metric for the latency praefect adds to every gitaly request. +func Stream(tracker *TrailerTracker) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + startTime := time.Now() + + requestID := uuid.New().String() + + wrapped := grpc_middleware.WrapServerStream(ss) + wrapped.WrappedContext = appendToIncomingContext(ss.Context(), metadata.Pairs(RequestIDKey, requestID)) + + err := handler(srv, wrapped) + + trailer, trailerErr := tracker.RemoveTrailer(requestID) + if trailerErr != nil { + grpc_logrus.Extract(ss.Context()).WithError(trailerErr).Error("error when getting trailer from tracker") + return err + } + + gitalyTimeTrailer := trailer.Get(gitalyTimeTrailerKey) + if len(gitalyTimeTrailer) > 0 { + gitalyTime, gitalyTimeErr := strconv.ParseFloat(gitalyTimeTrailer[0], 64) + if gitalyTimeErr == nil { + praefectTime := time.Since(startTime) + metrics.ProxyTime.Observe((float64(praefectTime) - gitalyTime) / float64(time.Second)) + } + } + + return err + } +} + +func appendToIncomingContext(ctx context.Context, md metadata.MD) context.Context { + existingMD, ok := metadata.FromIncomingContext(ctx) + if !ok { + existingMD = make(metadata.MD) + } + + return metadata.NewIncomingContext(ctx, metadata.Join(existingMD, md)) +} diff --git a/internal/middleware/proxytime/trailer.go b/internal/middleware/proxytime/trailer.go new file mode 100644 index 0000000000000000000000000000000000000000..84863435c5e6a4824bf447ba689c1d5a355d843c --- /dev/null +++ b/internal/middleware/proxytime/trailer.go @@ -0,0 +1,58 @@ +package proxytime + +import ( + "errors" + "fmt" + "sync" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// MaxTrailers is the cap for number of trailers the trailer tracker can track +const MaxTrailers = 100000 + +// TrailerTracker is an interface that tracks trailers to allow other components to access grpc trailer metadata. +type TrailerTracker struct { + mutex sync.Mutex + trailers map[string]*metadata.MD +} + +// NewTrailerTracker creates a new metrics struct +func NewTrailerTracker() *TrailerTracker { + return &TrailerTracker{ + trailers: make(map[string]*metadata.MD), + } +} + +// Trailer returns a call option that will set the gRPC trailer for a given id. +func (t *TrailerTracker) Trailer(id string) (grpc.CallOption, error) { + t.mutex.Lock() + defer t.mutex.Unlock() + + if len(t.trailers) > MaxTrailers { + return nil, errors.New("maximum number of trailers reached") + } + + if _, ok := t.trailers[id]; ok { + return nil, errors.New("a trailer with the same id is already in flight") + } + + newTrailer := metadata.New(nil) + t.trailers[id] = &newTrailer + + return grpc.Trailer(&newTrailer), nil +} + +// RemoveTrailer deletes the trailer after retrieving it based on the id +func (t *TrailerTracker) RemoveTrailer(id string) (*metadata.MD, error) { + t.mutex.Lock() + defer t.mutex.Unlock() + + if trailer, ok := t.trailers[id]; ok { + delete(t.trailers, id) + return trailer, nil + } + + return nil, fmt.Errorf("trailer with request_id %s does not exist", id) +} diff --git a/internal/middleware/proxytime/trailer_test.go b/internal/middleware/proxytime/trailer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9e9b19c717ee39565f7cfdcb3f8a76af4f90c996 --- /dev/null +++ b/internal/middleware/proxytime/trailer_test.go @@ -0,0 +1,74 @@ +package proxytime_test + +import ( + "net" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestTrailerTracker(t *testing.T) { + srv, socket := runHealthServer(t) + defer srv.Stop() + + tt := proxytime.NewTrailerTracker() + + id := "abc1234" + client := newHealthConnection(t, socket) + + ctx, cancel := testhelper.Context() + defer cancel() + + trailerOption, err := tt.Trailer(id) + require.NoError(t, err) + _, err = client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}, trailerOption) + require.NoError(t, err) + + trailer, err := tt.RemoveTrailer(id) + require.NoError(t, err) + require.Len(t, *trailer, 1) + require.Len(t, trailer.Get("gitaly-time"), 1) + gitalyTime, err := strconv.ParseFloat(trailer.Get("gitaly-time")[0], 64) + require.NoError(t, err) + + require.Greater(t, gitalyTime, 0.0) +} + +func newHealthConnection(t *testing.T, serverSocketPath string) grpc_health_v1.HealthClient { + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + } + + conn, err := grpc.Dial(serverSocketPath, connOpts...) + if err != nil { + t.Fatal(err) + } + + return grpc_health_v1.NewHealthClient(conn) +} + +func runHealthServer(t *testing.T) (*grpc.Server, string) { + opts := []grpc.ServerOption{ + grpc.StreamInterceptor(proxytime.StreamGitalyTime), + grpc.UnaryInterceptor(proxytime.UnaryGitalyTime), + } + + server := grpc.NewServer(opts...) + + healthSrvr := health.NewServer() + grpc_health_v1.RegisterHealthServer(server, healthSrvr) + + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", serverSocketPath) + require.NoError(t, err) + go server.Serve(listener) + + return server, "unix://" + serverSocketPath +} diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index f9130bfd105624b4e2ab601ca9d4f11f8f7abaea..6e78e2a80fcb724ee579212d87b95c77bc322ede 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -11,6 +11,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/internal/config/auth" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -189,15 +190,16 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func logEntry := log.Default() ds := datastore.NewInMemory(conf) + tt := proxytime.NewTrailerTracker() clientConnections := conn.NewClientConnections() clientConnections.RegisterNode("praefect-internal-0", backend, backendToken) - coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, fd) + coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, tt, fd) replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections) - srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf) + srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf, tt) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index f1b1d5492a973bc43666dd08c81cff558887d132..95edc167de3d38aeb4a7ef6ff28de4e801a4e921 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -12,6 +12,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/metadata" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -19,6 +21,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -37,21 +40,23 @@ type Coordinator struct { datastore datastore.Datastore - registry *protoregistry.Registry - conf config.Config + registry *protoregistry.Registry + conf config.Config + trailerTracker *proxytime.TrailerTracker } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, trailerTracker *proxytime.TrailerTracker, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) return &Coordinator{ - log: l, - datastore: ds, - registry: registry, - connections: clientConnections, - conf: conf, + log: l, + datastore: ds, + registry: registry, + connections: clientConnections, + conf: conf, + trailerTracker: trailerTracker, } } @@ -110,7 +115,16 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, return nil, fmt.Errorf("unable to find existing client connection for %s", storage) } - return proxy.NewStreamParameters(ctx, cc, requestFinalizer, nil), nil + var opts []grpc.CallOption + proxyRequestID := metadata.GetValue(ctx, proxytime.RequestIDKey) + if proxyRequestID != "" { + trailer, err := c.trailerTracker.Trailer(proxyRequestID) + if err == nil { + opts = append(opts, trailer) + } + } + + return proxy.NewStreamParameters(ctx, cc, requestFinalizer, opts), nil } var noopRequestFinalizer = func() {} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 4283779a16ec327874f59e3070be22157323399c..15e3fe9feba21de0b6d6026404444a7776b54324 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -9,6 +9,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -60,7 +61,7 @@ func TestStreamDirector(t *testing.T) { clientConnections := conn.NewClientConnections() clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token") - coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf) + coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf, proxytime.NewTrailerTracker()) require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...)) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 6e8791afb3e32d54643683363a4e81f0c57f9dd6..fc8b38e52f188366750ee8bfb179d1b31f99509a 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/client" internalauth "gitlab.com/gitlab-org/gitaly/internal/config/auth" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -72,8 +73,9 @@ func testConfig(backends int) config.Config { // injection func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnections, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) { var ( + tt = proxytime.NewTrailerTracker() ds = datastore.NewInMemory(conf) - coordinator = NewCoordinator(l, ds, clientCC, conf, fds...) + coordinator = NewCoordinator(l, ds, clientCC, conf, tt, fds...) ) var defaultNode *models.Node @@ -97,6 +99,7 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti l, clientCC, conf, + tt, ) return ds, server @@ -177,8 +180,9 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client ds := datastore.NewInMemory(conf) logEntry := log.Default() + tt := proxytime.NewTrailerTracker() - coordinator := NewCoordinator(logEntry, ds, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...) + coordinator := NewCoordinator(logEntry, ds, clientCC, conf, tt, protoregistry.GitalyProtoFileDescriptors...) replmgr := NewReplMgr( "", @@ -195,6 +199,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client logEntry, clientCC, conf, + tt, ) listener, port := listenAvailPort(t) diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go index fb53fe2771c7cabd49a644f1e7c879964bb1510a..61aa26ed9be4fb5f911777245cec4631ce9a7cef 100644 --- a/internal/praefect/metrics/prometheus.go +++ b/internal/praefect/metrics/prometheus.go @@ -1,8 +1,11 @@ package metrics import ( + "sync" + "github.com/prometheus/client_golang/prometheus" promconfig "gitlab.com/gitlab-org/gitaly/internal/config/prometheus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" ) // RegisterReplicationLatency creates and registers a prometheus histogram @@ -43,3 +46,25 @@ type Gauge interface { type Histogram interface { Observe(float64) } + +var once sync.Once + +var ( + // ProxyTime monitors the latency added by praefect to each request + ProxyTime prometheus.Histogram +) + +// RegisterProxyTime registers praefect prometheus metrics +func RegisterProxyTime(c config.Config) { + once.Do(func() { registerProxyTime(c) }) +} + +func registerProxyTime(c config.Config) { + ProxyTime = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "praefect_proxy_time", + Help: "Latency added by praefect", + Buckets: c.Prometheus.GRPCLatencyBuckets, + }) + + prometheus.MustRegister(ProxyTime) +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index ceb1b8a766a884a91b3bc6935484b73b664b11b1..141be71503dee89de4d793433e033289fbd4b705 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" @@ -60,7 +61,7 @@ func (srv *Server) warnDupeAddrs(c config.Config) { // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options -func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server { +func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config, tt *proxytime.TrailerTracker) *Server { ctxTagOpts := []grpc_ctxtags.Option{ grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), } @@ -68,6 +69,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + proxytime.Stream(tt), grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.StreamInterceptor, @@ -82,6 +84,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo panichandler.StreamPanicHandler, )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + proxytime.Unary(tt), grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.UnaryInterceptor, diff --git a/internal/server/server.go b/internal/server/server.go index 250749248891d3e3084fdb919ca3f8036a6a8fd5..b0cfcc5ec84d82a31ddf6923d31dc75ee021427c 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -20,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -78,6 +79,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { opts := []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + proxytime.StreamGitalyTime, grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.StreamInterceptor, @@ -94,6 +96,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { panichandler.StreamPanicHandler, )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + proxytime.UnaryGitalyTime, grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.UnaryInterceptor,