diff --git a/changelogs/unreleased/jc-add-praefect-metrics.yml b/changelogs/unreleased/jc-add-praefect-metrics.yml new file mode 100644 index 0000000000000000000000000000000000000000..927e2953e38180f03eb3c3699fd607df0940a85c --- /dev/null +++ b/changelogs/unreleased/jc-add-praefect-metrics.yml @@ -0,0 +1,5 @@ +--- +title: Add Praefect summary for how much latency praefect adds to gitaly calls +merge_request: 1655 +author: +type: other diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index b17394a53b938fcfe1ecf41b22cf6c29451bf611..f8bf46cedf6363c13db25af503c2419a52d4241d 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -22,6 +22,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/tracing" @@ -83,6 +84,8 @@ func configure() (config.Config, error) { promMux := http.NewServeMux() promMux.Handle("/metrics", promhttp.Handler()) + metrics.Register(conf) + go func() { http.ListenAndServe(conf.PrometheusListenAddr, promMux) }() diff --git a/internal/middleware/proxytime/gitaly_time.go b/internal/middleware/proxytime/gitaly_time.go new file mode 100644 index 0000000000000000000000000000000000000000..1479fbbde2dc8fbaebbecde1632a4d8c69e355b1 --- /dev/null +++ b/internal/middleware/proxytime/gitaly_time.go @@ -0,0 +1,33 @@ +package proxytime + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// GitalyTimeTrailerKey is the key name for a trailer value representing the total time a request has been in gitaly +const GitalyTimeTrailerKey = "gitaly-time" + +// StreamGitalyTime is a gRPC server-side interceptor that sets a trailer with the total time spent in gitaly. +func StreamGitalyTime(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + startTime := time.Now() + defer func() { + ss.SetTrailer(metadata.New(map[string]string{GitalyTimeTrailerKey: fmt.Sprintf("%d", int64(time.Since(startTime)))})) + }() + + return handler(srv, ss) +} + +// UnaryGitalyTime is a gRPC server-side interceptor that sets a trailer with the total time spent in gitaly. +func UnaryGitalyTime(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + defer func() { + grpc.SetTrailer(ctx, metadata.Pairs(GitalyTimeTrailerKey, fmt.Sprintf("%d", int64(time.Since(startTime))))) + }() + + return handler(ctx, req) +} diff --git a/internal/middleware/proxytime/proxy.go b/internal/middleware/proxytime/proxy.go new file mode 100644 index 0000000000000000000000000000000000000000..dedff5a9941e0810653359fa237ef0463c90da5c --- /dev/null +++ b/internal/middleware/proxytime/proxy.go @@ -0,0 +1,74 @@ +package proxytime + +import ( + "context" + "strconv" + "time" + + "github.com/google/uuid" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + // RequestIDKey is the key for a unique request id generated upon every rpc request that goes through praefect + RequestIDKey = "proxy-request-id" +) + +// Unary is a gRPC server-side interceptor that provides a prometheus metric for the latency praefect adds to every gitaly request. +func Unary(tracker TrailerTracker) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + + requestID := uuid.New().String() + + resp, err := handler(appendToIncomingContext(ctx, metadata.Pairs(RequestIDKey, requestID)), req) + + gitalyTimeTrailer := tracker.RetrieveTrailer(requestID).Get("gitaly-time") + if len(gitalyTimeTrailer) > 0 { + gitalyTime, err := strconv.ParseFloat(gitalyTimeTrailer[0], 64) + if err == nil { + praefectTime := time.Since(startTime) + metrics.ProxyTime.Observe(float64(praefectTime) - gitalyTime) + } + } + + return resp, err + } +} + +// Stream is a gRPC server-side interceptor that provides a prometheus metric for the latency praefect adds to every gitaly request. +func Stream(tracker TrailerTracker) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + startTime := time.Now() + + requestID := uuid.New().String() + + wrapped := grpc_middleware.WrapServerStream(ss) + wrapped.WrappedContext = appendToIncomingContext(ss.Context(), metadata.Pairs(RequestIDKey, requestID)) + + err := handler(srv, wrapped) + + gitalyTimeTrailer := tracker.RetrieveTrailer(requestID).Get("gitaly-time") + if len(gitalyTimeTrailer) > 0 { + gitalyTime, err := strconv.ParseFloat(gitalyTimeTrailer[0], 64) + if err == nil { + praefectTime := time.Since(startTime) + metrics.ProxyTime.Observe(float64(praefectTime) - gitalyTime) + } + } + + return err + } +} + +func appendToIncomingContext(ctx context.Context, md metadata.MD) context.Context { + existingMD, ok := metadata.FromIncomingContext(ctx) + if !ok { + existingMD = make(metadata.MD) + } + + return metadata.NewIncomingContext(ctx, metadata.Join(existingMD, md)) +} diff --git a/internal/middleware/proxytime/trailer.go b/internal/middleware/proxytime/trailer.go new file mode 100644 index 0000000000000000000000000000000000000000..aa8e750bc105f7b7e26389b2a864564e6f46ec0d --- /dev/null +++ b/internal/middleware/proxytime/trailer.go @@ -0,0 +1,56 @@ +package proxytime + +import ( + "sync" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// TrailerTracker is an interface that tracks trailers to allow other components to access grpc trailer metaata. +type TrailerTracker interface { + Trailer(id string) grpc.CallOption + RetrieveTrailer(id string) *metadata.MD +} + +// Tracker is an implementation of TrailerTracker +type Tracker struct { + mutex sync.RWMutex + trailers map[string]*metadata.MD +} + +// NewTrailerTracker creates a new metrics struct +func NewTrailerTracker() *Tracker { + return &Tracker{ + trailers: make(map[string]*metadata.MD), + } +} + +// Trailer gets the gRPC trailer for a given id +func (t *Tracker) Trailer(id string) grpc.CallOption { + t.mutex.RLock() + defer t.mutex.RUnlock() + + if trailer, ok := t.trailers[id]; ok { + return grpc.Trailer(trailer) + } + + newTrailer := make(metadata.MD) + t.trailers[id] = &newTrailer + + return grpc.Trailer(&newTrailer) +} + +// RetrieveTrailer deletes the trailer after retrieving it based on the id +func (t *Tracker) RetrieveTrailer(id string) *metadata.MD { + t.mutex.RLock() + defer t.mutex.RUnlock() + defer delete(t.trailers, id) + + if header, ok := t.trailers[id]; ok { + return header + } + + newHeader := make(metadata.MD) + return &newHeader +} diff --git a/internal/middleware/proxytime/trailer_test.go b/internal/middleware/proxytime/trailer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3774976859be8f75610236880a37d67127d3203c --- /dev/null +++ b/internal/middleware/proxytime/trailer_test.go @@ -0,0 +1,65 @@ +package proxytime_test + +import ( + "net" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestTrailerTracker(t *testing.T) { + srv, socket := runHealthServer(t) + defer srv.Stop() + + tt := proxytime.NewTrailerTracker() + + id := "abc1234" + client := newHealthConnection(t, socket) + + ctx, cancel := testhelper.Context() + defer cancel() + + _, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}, tt.Trailer(id)) + require.NoError(t, err) + + trailer := tt.RetrieveTrailer(id) + require.Len(t, *trailer, 1) +} + +func newHealthConnection(t *testing.T, serverSocketPath string) grpc_health_v1.HealthClient { + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + } + + conn, err := grpc.Dial(serverSocketPath, connOpts...) + if err != nil { + t.Fatal(err) + } + + return grpc_health_v1.NewHealthClient(conn) +} + +func runHealthServer(t *testing.T) (*grpc.Server, string) { + opts := []grpc.ServerOption{ + grpc.StreamInterceptor(proxytime.StreamGitalyTime), + grpc.UnaryInterceptor(proxytime.UnaryGitalyTime), + } + + server := grpc.NewServer(opts...) + + healthSrvr := health.NewServer() + grpc_health_v1.RegisterHealthServer(server, healthSrvr) + + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", serverSocketPath) + require.NoError(t, err) + go server.Serve(listener) + + return server, "unix://" + serverSocketPath +} diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index e400350fdaf6540b6442cfce1f5949b3c4856941..63b593d25f668eb32229c18c2a6fe10766a69377 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -14,16 +14,14 @@ import ( // Config is a container for everything found in the TOML config file type Config struct { - VirtualStorageName string `toml:"virtual_storage_name"` - ListenAddr string `toml:"listen_addr"` - SocketPath string `toml:"socket_path"` - - Nodes []*models.Node `toml:"node"` - - Logging log.Config `toml:"logging"` - Sentry sentry.Config `toml:"sentry"` - PrometheusListenAddr string `toml:"prometheus_listen_addr"` - Auth auth.Config `toml:"auth"` + VirtualStorageName string `toml:"virtual_storage_name"` + ListenAddr string `toml:"listen_addr"` + SocketPath string `toml:"socket_path"` + Nodes []*models.Node `toml:"node"` + Logging log.Config `toml:"logging"` + Sentry sentry.Config `toml:"sentry"` + Auth auth.Config `toml:"auth"` + PrometheusListenAddr string `toml:"prometheus_listen_addr"` } // FromFile loads the config for the passed file path diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index fa322da2b44d527166bf0c7e59376850fc6556a6..7aad8583bc00baf005736235a5a7cd82e1c7ce58 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -10,6 +10,8 @@ import ( "syscall" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/metadata" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -40,8 +42,9 @@ type Coordinator struct { datastore datastore.Datastore - registry *protoregistry.Registry - conf config.Config + registry *protoregistry.Registry + conf config.Config + trailerTracker proxytime.TrailerTracker } // NewCoordinator returns a new Coordinator that utilizes the provided logger @@ -50,11 +53,12 @@ func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections * registry.RegisterFiles(fileDescriptors...) return &Coordinator{ - log: l, - datastore: ds, - registry: registry, - connections: clientConnections, - conf: conf, + log: l, + datastore: ds, + registry: registry, + connections: clientConnections, + conf: conf, + trailerTracker: proxytime.NewTrailerTracker(), } } @@ -64,7 +68,7 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) } // streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (proxy.StreamParameters, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) @@ -74,12 +78,12 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { - return nil, nil, nil, err + return nil, err } m, err := protoMessageFromPeeker(mi, peeker) if err != nil { - return nil, nil, nil, err + return nil, err } var requestFinalizer func() @@ -88,22 +92,56 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, if mi.Scope == protoregistry.ScopeRepository { storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) if err != nil { - return nil, nil, nil, err + return nil, err } } else { storage, requestFinalizer, err = c.getAnyStorageNode() if err != nil { - return nil, nil, nil, err + return nil, err } } // We only need the primary node, as there's only one primary storage // location per praefect at this time cc, err := c.connections.GetConnection(storage) if err != nil { - return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage) + return nil, fmt.Errorf("unable to find existing client connection for %s", storage) } - return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil + var opts []grpc.CallOption + proxyRequestID := metadata.GetValue(ctx, proxytime.RequestIDKey) + if proxyRequestID != "" { + opts = append(opts, c.trailerTracker.Trailer(proxyRequestID)) + } + + return &StreamParameters{ + ctx: helper.IncomingToOutgoing(ctx), + conn: cc, + reqFinalizer: requestFinalizer, + callOptions: opts, + }, nil +} + +type StreamParameters struct { + ctx context.Context + conn *grpc.ClientConn + reqFinalizer func() + callOptions []grpc.CallOption +} + +func (s *StreamParameters) Context() context.Context { + return s.ctx +} + +func (s *StreamParameters) Conn() *grpc.ClientConn { + return s.conn +} + +func (s *StreamParameters) RequestFinalizer() func() { + return s.reqFinalizer +} + +func (s *StreamParameters) CallOptions() []grpc.CallOption { + return s.callOptions } var noopRequestFinalizer = func() {} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 76a0a84d3f71000c87fd6f245d82bd5a59f5a286..d29b39fa5fa15b014a23081eef55e07b7a6a0a87 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -21,6 +22,7 @@ import ( var testLogger = logrus.New() func init() { + metrics.Register(config.Config{}) testLogger.SetOutput(ioutil.Discard) } @@ -69,9 +71,10 @@ func TestStreamDirector(t *testing.T) { fullMethod := "/gitaly.ObjectPoolService/FetchIntoObjectPool" peeker := &mockPeeker{frame} - _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, fullMethod, peeker) + //_, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, fullMethod, peeker) + streamParams, err := coordinator.streamDirector(ctx, fullMethod, peeker) require.NoError(t, err) - require.Equal(t, address, conn.Target()) + require.Equal(t, address, streamParams.Conn().Target()) mi, err := coordinator.registry.LookupMethod(fullMethod) require.NoError(t, err) @@ -107,7 +110,7 @@ func TestStreamDirector(t *testing.T) { require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct") - jobUpdateFunc() + streamParams.RequestFinalizer() jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, 1, 10) require.NoError(t, err) diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 82712765f65dcd511b9dc7848f61e1fb0a534543..32aa8d7248cab5190283499b9b96e3a9bc8a81ab 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -21,4 +21,11 @@ import ( // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error) +type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (StreamParameters, error) + +type StreamParameters interface { + Context() context.Context + Conn() *grpc.ClientConn + RequestFinalizer() func() + CallOptions() []grpc.CallOption +} diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index 6d4a3238ee09fae8dd83b821c14ed8533786a9d5..7a1f5b65a33b70de45bc614d440c34e7e6911990 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -33,16 +33,17 @@ func ExampleRegisterService() { func ExampleTransparentHandler() { grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + ) } // Provide sa simple example of a director that shields internal services and dials a staging or production backend. // This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. func ExampleStreamDirector() { - director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (proxy.StreamParameters, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } md, ok := metadata.FromIncomingContext(ctx) // Copy the inbound metadata explicitly. @@ -53,12 +54,12 @@ func ExampleStreamDirector() { if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, nil, err + return &testStreamParameters{ctx: outCtx, conn: conn}, err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, nil, err + return &testStreamParameters{ctx: outCtx, conn: conn}, err } } - return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } } diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index d30a13e58c0dc267ef56289941c0e1eeaf6da8bd..b68529bac64867d522812b8cc511b594e756b0a0 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -16,6 +16,11 @@ import ( "google.golang.org/grpc/status" ) +const ( + // RequestIDKey is the key for a unique request id generated upon every rpc request that goes through praefect + RequestIDKey = "proxy-request-id" +) + var ( clientStreamDescForProxying = &grpc.StreamDesc{ ServerStreams: true, @@ -28,7 +33,7 @@ var ( // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { - streamer := &handler{director} + streamer := &handler{director: director} fakeDesc := &grpc.ServiceDesc{ ServiceName: serviceName, HandlerType: (*interface{})(nil), @@ -51,7 +56,7 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director} + streamer := &handler{director: director} return streamer.handler } @@ -72,20 +77,18 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error peeker := newPeeker(serverStream) // We require that the director's returned context inherits from the serverStream.Context(). - outgoingCtx, backendConn, requestFinalizer, err := s.director(serverStream.Context(), fullMethodName, peeker) + streamParams, err := s.director(serverStream.Context(), fullMethodName, peeker) if err != nil { return err } defer func() { - if requestFinalizer != nil { - requestFinalizer() - } + streamParams.RequestFinalizer() }() - clientCtx, clientCancel := context.WithCancel(outgoingCtx) + clientCtx, clientCancel := context.WithCancel(streamParams.Context()) // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. - clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) + clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, streamParams.Conn(), fullMethodName, streamParams.CallOptions()...) if err != nil { return err } @@ -94,6 +97,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ s2cErrChan := s.forwardServerToClient(serverStream, clientStream, peeker.consumedStream) c2sErrChan := s.forwardClientToServer(clientStream, serverStream) + // We don't know which side is going to stop sending first, so we need a select between the two. for i := 0; i < 2; i++ { select { @@ -114,10 +118,12 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers // will be nil. serverStream.SetTrailer(clientStream.Trailer()) + // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. if c2sErr != io.EOF { return c2sErr } + return nil } } diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index c57837d2a9e74c7c1199575c03584279265297ac..62344c46b1bbdb7471fedd7341ef3a2ccdada0db 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -16,6 +16,8 @@ import ( "testing" "time" + "fmt" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -26,8 +28,6 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" - "fmt" - pb "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata" ) @@ -185,6 +185,29 @@ func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") } +type testStreamParameters struct { + ctx context.Context + conn *grpc.ClientConn + reqFinalizer func() + callOptions []grpc.CallOption +} + +func (s *testStreamParameters) Context() context.Context { + return s.ctx +} + +func (s *testStreamParameters) Conn() *grpc.ClientConn { + return s.conn +} + +func (s *testStreamParameters) RequestFinalizer() func() { + return s.reqFinalizer +} + +func (s *testStreamParameters) CallOptions() []grpc.CallOption { + return s.callOptions +} + func (s *ProxyHappySuite) TestPingStream_StressTest() { for i := 0; i < 50; i++ { s.TestPingStream_FullDuplexWorks() @@ -207,17 +230,17 @@ func (s *ProxyHappySuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (proxy.StreamParameters, func(), error) { md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return ctx, nil, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection") + return &testStreamParameters{ctx: ctx}, grpc.Errorf(codes.PermissionDenied, "testing rejection") } } // Explicitly copy the metadata, otherwise the tests will fail. outCtx, _ := context.WithCancel(ctx) outCtx = metadata.NewOutgoingContext(outCtx, md.Copy()) - return outCtx, s.serverClientConn, nil, nil + return &testStreamParameters{ctx: outCtx, conn: s.serverClientConn}, nil } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), diff --git a/internal/praefect/grpc-proxy/proxy/helper_test.go b/internal/praefect/grpc-proxy/proxy/helper_test.go index 7502fc2718bd3aadcaa04bf04c48ee219a78a0ad..51a3e97a2c3c081644f22454bef2926f1172080d 100644 --- a/internal/praefect/grpc-proxy/proxy/helper_test.go +++ b/internal/praefect/grpc-proxy/proxy/helper_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "golang.org/x/net/context" "google.golang.org/grpc" @@ -56,7 +57,7 @@ func newBackendPinger(tb testing.TB, ctx context.Context) (*grpc.ClientConn, *in func newProxy(tb testing.TB, ctx context.Context, director proxy.StreamDirector, svc, method string) (*grpc.ClientConn, func()) { proxySrvr := grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director, proxytime.NewTrailerTracker())), ) proxy.RegisterService(proxySrvr, director, svc, method) diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index a31e7af34b3bf80006e97c88e983f709d3559683..72186b9282d091e2968cc232d35e109d33faf792 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -8,7 +8,6 @@ import ( "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "golang.org/x/net/context" - "google.golang.org/grpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,7 +27,7 @@ func TestStreamPeeking(t *testing.T) { pingReqSent := &testservice.PingRequest{Value: "hi"} // director will peek into stream before routing traffic - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (StreamParamters, error) { t.Logf("director routing method %s to backend", fullMethodName) peekedMsg, err := peeker.Peek() @@ -39,7 +38,7 @@ func TestStreamPeeking(t *testing.T) { require.NoError(t, err) require.Equal(t, pingReqSent, peekedRequest) - return ctx, backendCC, nil, nil + return &testStreamParameters{ctx: ctx, conn: backendCC}, nil } pingResp := &testservice.PingResponse{ @@ -87,7 +86,7 @@ func TestStreamInjecting(t *testing.T) { newValue := "bye" // director will peek into stream and change some frames - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (StreamParameters, func(), error) { t.Logf("modifying request for method %s", fullMethodName) peekedMsg, err := peeker.Peek() @@ -104,7 +103,7 @@ func TestStreamInjecting(t *testing.T) { require.NoError(t, peeker.Modify(newPayload)) - return ctx, backendCC, nil, nil + return testStreamParameters{ctx: ctx, conn: backendCC}, nil } pingResp := &testservice.PingResponse{ diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go new file mode 100644 index 0000000000000000000000000000000000000000..15c6a328fa8e57f69e4d602495a82e64b22425f5 --- /dev/null +++ b/internal/praefect/metrics/prometheus.go @@ -0,0 +1,30 @@ +package metrics + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" +) + +var once sync.Once + +var ( + // ProxyTime monitors the latency added by praefect to each request + ProxyTime prometheus.Histogram +) + +// Register registers praefect prometheus metrics +func Register(c config.Config) { + once.Do(func() { register(c) }) +} + +func register(c config.Config) { + ProxyTime = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "praefect_proxy_time", + Help: "Latency added by praefect", + Buckets: prometheus.LinearBuckets(100000, 100000, 20), + }) + + prometheus.MustRegister(ProxyTime) +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index ada0d05f6cec9e5c918f2f402705badf514c67a6..0467362059e03fc1fbddb91aa96b26c94d1d0dba 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" @@ -59,6 +60,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler + proxytime.Stream(c.trailerTracker), grpc_prometheus.StreamServerInterceptor, grpc_logrus.StreamServerInterceptor(l), sentryhandler.StreamLogHandler, @@ -72,6 +74,7 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.UnaryInterceptor, + proxytime.Unary(c.trailerTracker), grpc_prometheus.UnaryServerInterceptor, grpc_logrus.UnaryServerInterceptor(l), sentryhandler.UnaryLogHandler, diff --git a/internal/server/server.go b/internal/server/server.go index 97013d48e7c54f1ed4a0117b28f797b00b211092..6c8ea3c9d392baf63ba807252d7112a3bead908e 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" + "gitlab.com/gitlab-org/gitaly/internal/middleware/proxytime" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -80,6 +81,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { grpc_ctxtags.StreamServerInterceptor(ctxTagOpts...), grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.StreamInterceptor, + proxytime.StreamGitalyTime, grpc_prometheus.StreamServerInterceptor, grpc_logrus.StreamServerInterceptor(logrusEntry), sentryhandler.StreamLogHandler, @@ -96,6 +98,7 @@ func createNewServer(rubyServer *rubyserver.Server, secure bool) *grpc.Server { grpc_ctxtags.UnaryServerInterceptor(ctxTagOpts...), grpccorrelation.UnaryServerCorrelationInterceptor(), // Must be above the metadata handler metadatahandler.UnaryInterceptor, + proxytime.UnaryGitalyTime, grpc_prometheus.UnaryServerInterceptor, grpc_logrus.UnaryServerInterceptor(logrusEntry), sentryhandler.UnaryLogHandler,