From d5530ab6715cbf9f365ce833fabeb8d17a37bd9b Mon Sep 17 00:00:00 2001 From: Ben Kochie Date: Thu, 1 Nov 2018 12:08:50 +0100 Subject: [PATCH 1/2] Update vendor google.golang.org/grpc/...@v1.10 --- vendor/google.golang.org/grpc/call.go | 306 +----------------- vendor/google.golang.org/grpc/clientconn.go | 67 ++-- vendor/google.golang.org/grpc/codec.go | 98 +----- vendor/google.golang.org/grpc/codes/codes.go | 23 +- .../grpc/credentials/credentials.go | 5 +- .../grpc/encoding/encoding.go | 99 ++++-- .../grpc/encoding/proto/proto.go | 110 +++++++ vendor/google.golang.org/grpc/go16.go | 5 +- vendor/google.golang.org/grpc/go17.go | 5 +- .../grpc/metadata/metadata.go | 55 +++- .../grpc/resolver/resolver.go | 21 +- .../grpc/resolver_conn_wrapper.go | 14 +- vendor/google.golang.org/grpc/rpc_util.go | 98 +++++- vendor/google.golang.org/grpc/server.go | 33 +- .../google.golang.org/grpc/status/status.go | 12 +- vendor/google.golang.org/grpc/stream.go | 284 +++++++--------- .../grpc/transport/handler_server.go | 67 +++- .../grpc/transport/http2_client.go | 63 ++-- .../grpc/transport/http2_server.go | 29 +- .../grpc/transport/http_util.go | 67 +++- .../grpc/transport/transport.go | 31 +- vendor/vendor.json | 232 ++++++------- 22 files changed, 896 insertions(+), 828 deletions(-) create mode 100644 vendor/google.golang.org/grpc/encoding/proto/proto.go diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index 13cf8b13b59..a66e3c2d958 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go @@ -19,131 +19,13 @@ package grpc import ( - "io" - "time" - "golang.org/x/net/context" - "golang.org/x/net/trace" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/encoding" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/stats" - "google.golang.org/grpc/status" - "google.golang.org/grpc/transport" ) -// recvResponse receives and parses an RPC response. -// On error, it returns the error and indicates whether the call should be retried. -// -// TODO(zhaoq): Check whether the received message sequence is valid. -// TODO ctx is used for stats collection and processing. It is the context passed from the application. -func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { - // Try to acquire header metadata from the server if there is any. - defer func() { - if err != nil { - if _, ok := err.(transport.ConnectionError); !ok { - t.CloseStream(stream, err) - } - } - }() - c.headerMD, err = stream.Header() - if err != nil { - return - } - p := &parser{r: stream} - var inPayload *stats.InPayload - if dopts.copts.StatsHandler != nil { - inPayload = &stats.InPayload{ - Client: true, - } - } - for { - if c.maxReceiveMessageSize == nil { - return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") - } - - // Set dc if it exists and matches the message compression type used, - // otherwise set comp if a registered compressor exists for it. - var comp encoding.Compressor - var dc Decompressor - if rc := stream.RecvCompress(); dopts.dc != nil && dopts.dc.Type() == rc { - dc = dopts.dc - } else if rc != "" && rc != encoding.Identity { - comp = encoding.GetCompressor(rc) - } - if err = recv(p, dopts.codec, stream, dc, reply, *c.maxReceiveMessageSize, inPayload, comp); err != nil { - if err == io.EOF { - break - } - return - } - } - if inPayload != nil && err == io.EOF && stream.Status().Code() == codes.OK { - // TODO in the current implementation, inTrailer may be handled before inPayload in some cases. - // Fix the order if necessary. - dopts.copts.StatsHandler.HandleRPC(ctx, inPayload) - } - c.trailerMD = stream.Trailer() - return nil -} - -// sendRequest writes out various information of an RPC such as Context and Message. -func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) { - defer func() { - if err != nil { - // If err is connection error, t will be closed, no need to close stream here. - if _, ok := err.(transport.ConnectionError); !ok { - t.CloseStream(stream, err) - } - } - }() - var ( - outPayload *stats.OutPayload - ) - if dopts.copts.StatsHandler != nil { - outPayload = &stats.OutPayload{ - Client: true, - } - } - // Set comp and clear compressor if a registered compressor matches the type - // specified via UseCompressor. (And error if a matching compressor is not - // registered.) - var comp encoding.Compressor - if ct := c.compressorType; ct != "" && ct != encoding.Identity { - compressor = nil // Disable the legacy compressor. - comp = encoding.GetCompressor(ct) - if comp == nil { - return status.Errorf(codes.Internal, "grpc: Compressor is not installed for grpc-encoding %q", ct) - } - } - hdr, data, err := encode(dopts.codec, args, compressor, outPayload, comp) - if err != nil { - return err - } - if c.maxSendMessageSize == nil { - return status.Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") - } - if len(data) > *c.maxSendMessageSize { - return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), *c.maxSendMessageSize) - } - err = t.Write(stream, hdr, data, opts) - if err == nil && outPayload != nil { - outPayload.SentTime = time.Now() - dopts.copts.StatsHandler.HandleRPC(ctx, outPayload) - } - // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method - // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following - // recvResponse to get the final status. - if err != nil && err != io.EOF { - return err - } - // Sent successfully. - return nil -} - // Invoke sends the RPC request on the wire and returns after response is // received. This is typically called by generated code. +// +// All errors returned by Invoke are compatible with the status package. func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { if cc.dopts.unaryInt != nil { return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) @@ -159,188 +41,34 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return cc.Invoke(ctx, method, args, reply, opts...) } -func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { - c := defaultCallInfo() - mc := cc.GetMethodConfig(method) - if mc.WaitForReady != nil { - c.failFast = !*mc.WaitForReady - } - - if mc.Timeout != nil && *mc.Timeout >= 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) - defer cancel() - } - - opts = append(cc.dopts.callOptions, opts...) - for _, o := range opts { - if err := o.before(c); err != nil { - return toRPCErr(err) - } - } - defer func() { - for _, o := range opts { - o.after(c) - } - }() - - c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) - c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) +var unaryStreamDesc = &StreamDesc{ServerStreams: false, ClientStreams: false} - if EnableTracing { - c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) - defer c.traceInfo.tr.Finish() - c.traceInfo.firstLine.client = true - if deadline, ok := ctx.Deadline(); ok { - c.traceInfo.firstLine.deadline = deadline.Sub(time.Now()) - } - c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false) - // TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set. - defer func() { - if e != nil { - c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true) - c.traceInfo.tr.SetError() - } - }() - } - ctx = newContextWithRPCInfo(ctx, c.failFast) - sh := cc.dopts.copts.StatsHandler - if sh != nil { - ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) - begin := &stats.Begin{ - Client: true, - BeginTime: time.Now(), - FailFast: c.failFast, - } - sh.HandleRPC(ctx, begin) - defer func() { - end := &stats.End{ - Client: true, - EndTime: time.Now(), - Error: e, - } - sh.HandleRPC(ctx, end) - }() - } - topts := &transport.Options{ - Last: true, - Delay: false, - } - callHdr := &transport.CallHdr{ - Host: cc.authority, - Method: method, - } - if c.creds != nil { - callHdr.Creds = c.creds - } - if c.compressorType != "" { - callHdr.SendCompress = c.compressorType - } else if cc.dopts.cp != nil { - callHdr.SendCompress = cc.dopts.cp.Type() - } +func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { + // TODO: implement retries in clientStream and make this simply + // newClientStream, SendMsg, RecvMsg. firstAttempt := true - for { - // Check to make sure the context has expired. This will prevent us from - // looping forever if an error occurs for wait-for-ready RPCs where no data - // is sent on the wire. - select { - case <-ctx.Done(): - return toRPCErr(ctx.Err()) - default: - } - - // Record the done handler from Balancer.Get(...). It is called once the - // RPC has completed or failed. - t, done, err := cc.getTransport(ctx, c.failFast) + csInt, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } - stream, err := t.NewStream(ctx, callHdr) - if err != nil { - if done != nil { - done(balancer.DoneInfo{Err: err}) - } - // In the event of any error from NewStream, we never attempted to write - // anything to the wire, so we can retry indefinitely for non-fail-fast - // RPCs. - if !c.failFast { + cs := csInt.(*clientStream) + if err := cs.SendMsg(req); err != nil { + if !cs.c.failFast && cs.s.Unprocessed() && firstAttempt { + // TODO: Add a field to header for grpc-transparent-retry-attempts + firstAttempt = false continue } - return toRPCErr(err) - } - if peer, ok := peer.FromContext(stream.Context()); ok { - c.peer = peer - } - if c.traceInfo.tr != nil { - c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) - } - err = sendRequest(ctx, cc.dopts, cc.dopts.cp, c, callHdr, stream, t, args, topts) - if err != nil { - if done != nil { - done(balancer.DoneInfo{ - Err: err, - BytesSent: true, - BytesReceived: stream.BytesReceived(), - }) - } - // Retry a non-failfast RPC when - // i) the server started to drain before this RPC was initiated. - // ii) the server refused the stream. - if !c.failFast && stream.Unprocessed() { - // In this case, the server did not receive the data, but we still - // created wire traffic, so we should not retry indefinitely. - if firstAttempt { - // TODO: Add a field to header for grpc-transparent-retry-attempts - firstAttempt = false - continue - } - // Otherwise, give up and return an error anyway. - } - return toRPCErr(err) - } - err = recvResponse(ctx, cc.dopts, t, c, stream, reply) - if err != nil { - if done != nil { - done(balancer.DoneInfo{ - Err: err, - BytesSent: true, - BytesReceived: stream.BytesReceived(), - }) - } - if !c.failFast && stream.Unprocessed() { - // In these cases, the server did not receive the data, but we still - // created wire traffic, so we should not retry indefinitely. - if firstAttempt { - // TODO: Add a field to header for grpc-transparent-retry-attempts - firstAttempt = false - continue - } - // Otherwise, give up and return an error anyway. - } - return toRPCErr(err) - } - if c.traceInfo.tr != nil { - c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true) - } - t.CloseStream(stream, nil) - err = stream.Status().Err() - if done != nil { - done(balancer.DoneInfo{ - Err: err, - BytesSent: true, - BytesReceived: stream.BytesReceived(), - }) + return err } - if !c.failFast && stream.Unprocessed() { - // In these cases, the server did not receive the data, but we still - // created wire traffic, so we should not retry indefinitely. - if firstAttempt { + if err := cs.RecvMsg(reply); err != nil { + if !cs.c.failFast && cs.s.Unprocessed() && firstAttempt { // TODO: Add a field to header for grpc-transparent-retry-attempts firstAttempt = false continue } + return err } - return err + return nil } } diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index bfbef362138..208e3c9b69d 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -32,6 +32,7 @@ import ( "golang.org/x/net/trace" "google.golang.org/grpc/balancer" _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" @@ -40,17 +41,17 @@ import ( _ "google.golang.org/grpc/resolver/dns" // To register dns resolver. _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver. "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" "google.golang.org/grpc/transport" ) var ( // ErrClientConnClosing indicates that the operation is illegal because // the ClientConn is closing. - ErrClientConnClosing = errors.New("grpc: the client connection is closing") - // ErrClientConnTimeout indicates that the ClientConn cannot establish the - // underlying connections within the specified timeout. - // DEPRECATED: Please use context.DeadlineExceeded instead. - ErrClientConnTimeout = errors.New("grpc: timed out when dialing") + // + // Deprecated: this error should not be relied upon by users; use the status + // code of Canceled instead. + ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. errConnDrain = errors.New("grpc: the connection is drained") // errConnClosing indicates that the connection is closing. @@ -85,7 +86,6 @@ var ( type dialOptions struct { unaryInt UnaryClientInterceptor streamInt StreamClientInterceptor - codec Codec cp Compressor dc Decompressor bs backoffStrategy @@ -99,10 +99,8 @@ type dialOptions struct { // balancer, and also by WithBalancerName dial option. balancerBuilder balancer.Builder // This is to support grpclb. - resolverBuilder resolver.Builder - // Custom user options for resolver.Build. - resolverBuildUserOptions interface{} - waitForHandshake bool + resolverBuilder resolver.Builder + waitForHandshake bool } const ( @@ -167,10 +165,10 @@ func WithDefaultCallOptions(cos ...CallOption) DialOption { } // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. +// +// Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead. func WithCodec(c Codec) DialOption { - return func(o *dialOptions) { - o.codec = c - } + return WithDefaultCallOptions(CallCustomCodec(c)) } // WithCompressor returns a DialOption which sets a Compressor to use for @@ -236,14 +234,6 @@ func withResolverBuilder(b resolver.Builder) DialOption { } } -// WithResolverUserOptions returns a DialOption which sets the UserOptions -// field of resolver's BuildOption. -func WithResolverUserOptions(userOpt interface{}) DialOption { - return func(o *dialOptions) { - o.resolverBuildUserOptions = userOpt - } -} - // WithServiceConfig returns a DialOption which has a channel to read the service configuration. // DEPRECATED: service config should be received through name resolver, as specified here. // https://github.com/grpc/grpc/blob/master/doc/service_config.md @@ -407,6 +397,10 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { // cancel or expire the pending connection. Once this function returns, the // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close // to terminate all the pending operations after this function returns. +// +// The target name syntax is defined in +// https://github.com/grpc/grpc/blob/master/doc/naming.md. +// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, @@ -482,14 +476,28 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * default: } } - // Set defaults. - if cc.dopts.codec == nil { - cc.dopts.codec = protoCodec{} - } if cc.dopts.bs == nil { cc.dopts.bs = DefaultBackoffConfig } - cc.parsedTarget = parseTarget(cc.target) + if cc.dopts.resolverBuilder == nil { + // Only try to parse target when resolver builder is not already set. + cc.parsedTarget = parseTarget(cc.target) + grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) + cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) + if cc.dopts.resolverBuilder == nil { + // If resolver builder is still nil, the parse target's scheme is + // not registered. Fallback to default resolver and set Endpoint to + // the original unparsed target. + grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) + cc.parsedTarget = resolver.Target{ + Scheme: resolver.GetDefaultScheme(), + Endpoint: target, + } + cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) + } + } else { + cc.parsedTarget = resolver.Target{Endpoint: target} + } creds := cc.dopts.copts.TransportCredentials if creds != nil && creds.Info().ServerName != "" { cc.authority = creds.Info().ServerName @@ -1385,3 +1393,10 @@ func (ac *addrConn) getState() connectivity.State { defer ac.mu.Unlock() return ac.state } + +// ErrClientConnTimeout indicates that the ClientConn cannot establish the +// underlying connections within the specified timeout. +// +// Deprecated: This error is never returned by grpc and should not be +// referenced by users. +var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") diff --git a/vendor/google.golang.org/grpc/codec.go b/vendor/google.golang.org/grpc/codec.go index 43d81ed2ab4..12977654781 100644 --- a/vendor/google.golang.org/grpc/codec.go +++ b/vendor/google.golang.org/grpc/codec.go @@ -19,96 +19,32 @@ package grpc import ( - "math" - "sync" - - "github.com/golang/protobuf/proto" + "google.golang.org/grpc/encoding" + _ "google.golang.org/grpc/encoding/proto" // to register the Codec for "proto" ) +// baseCodec contains the functionality of both Codec and encoding.Codec, but +// omits the name/string, which vary between the two and are not needed for +// anything besides the registry in the encoding package. +type baseCodec interface { + Marshal(v interface{}) ([]byte, error) + Unmarshal(data []byte, v interface{}) error +} + +var _ baseCodec = Codec(nil) +var _ baseCodec = encoding.Codec(nil) + // Codec defines the interface gRPC uses to encode and decode messages. // Note that implementations of this interface must be thread safe; // a Codec's methods can be called from concurrent goroutines. +// +// Deprecated: use encoding.Codec instead. type Codec interface { // Marshal returns the wire format of v. Marshal(v interface{}) ([]byte, error) // Unmarshal parses the wire format into v. Unmarshal(data []byte, v interface{}) error - // String returns the name of the Codec implementation. The returned - // string will be used as part of content type in transmission. + // String returns the name of the Codec implementation. This is unused by + // gRPC. String() string } - -// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC. -type protoCodec struct { -} - -type cachedProtoBuffer struct { - lastMarshaledSize uint32 - proto.Buffer -} - -func capToMaxInt32(val int) uint32 { - if val > math.MaxInt32 { - return uint32(math.MaxInt32) - } - return uint32(val) -} - -func (p protoCodec) marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) { - protoMsg := v.(proto.Message) - newSlice := make([]byte, 0, cb.lastMarshaledSize) - - cb.SetBuf(newSlice) - cb.Reset() - if err := cb.Marshal(protoMsg); err != nil { - return nil, err - } - out := cb.Bytes() - cb.lastMarshaledSize = capToMaxInt32(len(out)) - return out, nil -} - -func (p protoCodec) Marshal(v interface{}) ([]byte, error) { - if pm, ok := v.(proto.Marshaler); ok { - // object can marshal itself, no need for buffer - return pm.Marshal() - } - - cb := protoBufferPool.Get().(*cachedProtoBuffer) - out, err := p.marshal(v, cb) - - // put back buffer and lose the ref to the slice - cb.SetBuf(nil) - protoBufferPool.Put(cb) - return out, err -} - -func (p protoCodec) Unmarshal(data []byte, v interface{}) error { - protoMsg := v.(proto.Message) - protoMsg.Reset() - - if pu, ok := protoMsg.(proto.Unmarshaler); ok { - // object can unmarshal itself, no need for buffer - return pu.Unmarshal(data) - } - - cb := protoBufferPool.Get().(*cachedProtoBuffer) - cb.SetBuf(data) - err := cb.Unmarshal(protoMsg) - cb.SetBuf(nil) - protoBufferPool.Put(cb) - return err -} - -func (protoCodec) String() string { - return "proto" -} - -var protoBufferPool = &sync.Pool{ - New: func() interface{} { - return &cachedProtoBuffer{ - Buffer: proto.Buffer{}, - lastMarshaledSize: 16, - } - }, -} diff --git a/vendor/google.golang.org/grpc/codes/codes.go b/vendor/google.golang.org/grpc/codes/codes.go index f3719d56280..a8280ae660d 100644 --- a/vendor/google.golang.org/grpc/codes/codes.go +++ b/vendor/google.golang.org/grpc/codes/codes.go @@ -19,6 +19,7 @@ // Package codes defines the canonical error codes used by gRPC. It is // consistent across various languages. package codes // import "google.golang.org/grpc/codes" + import ( "fmt" ) @@ -33,9 +34,9 @@ const ( // Canceled indicates the operation was canceled (typically by the caller). Canceled Code = 1 - // Unknown error. An example of where this error may be returned is + // Unknown error. An example of where this error may be returned is // if a Status value received from another address space belongs to - // an error-space that is not known in this address space. Also + // an error-space that is not known in this address space. Also // errors raised by APIs that do not return enough error information // may be converted to this error. Unknown Code = 2 @@ -64,15 +65,11 @@ const ( // PermissionDenied indicates the caller does not have permission to // execute the specified operation. It must not be used for rejections // caused by exhausting some resource (use ResourceExhausted - // instead for those errors). It must not be + // instead for those errors). It must not be // used if the caller cannot be identified (use Unauthenticated // instead for those errors). PermissionDenied Code = 7 - // Unauthenticated indicates the request does not have valid - // authentication credentials for the operation. - Unauthenticated Code = 16 - // ResourceExhausted indicates some resource has been exhausted, perhaps // a per-user quota, or perhaps the entire file system is out of space. ResourceExhausted Code = 8 @@ -88,7 +85,7 @@ const ( // (b) Use Aborted if the client should retry at a higher-level // (e.g., restarting a read-modify-write sequence). // (c) Use FailedPrecondition if the client should not retry until - // the system state has been explicitly fixed. E.g., if an "rmdir" + // the system state has been explicitly fixed. E.g., if an "rmdir" // fails because the directory is non-empty, FailedPrecondition // should be returned since the client should not retry unless // they have first fixed up the directory by deleting files from it. @@ -117,7 +114,7 @@ const ( // file size. // // There is a fair bit of overlap between FailedPrecondition and - // OutOfRange. We recommend using OutOfRange (the more specific + // OutOfRange. We recommend using OutOfRange (the more specific // error) when it applies so that callers who are iterating through // a space can easily look for an OutOfRange error to detect when // they are done. @@ -127,8 +124,8 @@ const ( // supported/enabled in this service. Unimplemented Code = 12 - // Internal errors. Means some invariants expected by underlying - // system has been broken. If you see one of these errors, + // Internal errors. Means some invariants expected by underlying + // system has been broken. If you see one of these errors, // something is very broken. Internal Code = 13 @@ -142,6 +139,10 @@ const ( // DataLoss indicates unrecoverable data loss or corruption. DataLoss Code = 15 + + // Unauthenticated indicates the request does not have valid + // authentication credentials for the operation. + Unauthenticated Code = 16 ) var strToCode = map[string]Code{ diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go index 1d2e864f8f4..3351bf0ee5f 100644 --- a/vendor/google.golang.org/grpc/credentials/credentials.go +++ b/vendor/google.golang.org/grpc/credentials/credentials.go @@ -43,8 +43,9 @@ type PerRPCCredentials interface { // GetRequestMetadata gets the current request metadata, refreshing // tokens if required. This should be called by the transport layer on // each request, and the data should be populated in headers or other - // context. uri is the URI of the entry point for the request. When - // supported by the underlying implementation, ctx can be used for + // context. If a status code is returned, it will be used as the status + // for the RPC. uri is the URI of the entry point for the request. + // When supported by the underlying implementation, ctx can be used for // timeout and cancellation. // TODO(zhaoq): Define the set of the qualified keys instead of leaving // it as an arbitrary string. diff --git a/vendor/google.golang.org/grpc/encoding/encoding.go b/vendor/google.golang.org/grpc/encoding/encoding.go index 47d10b07666..8e26c194364 100644 --- a/vendor/google.golang.org/grpc/encoding/encoding.go +++ b/vendor/google.golang.org/grpc/encoding/encoding.go @@ -16,46 +16,103 @@ * */ -// Package encoding defines the interface for the compressor and the functions -// to register and get the compossor. +// Package encoding defines the interface for the compressor and codec, and +// functions to register and retrieve compressors and codecs. +// // This package is EXPERIMENTAL. package encoding import ( "io" + "strings" ) -var registerCompressor = make(map[string]Compressor) +// Identity specifies the optional encoding for uncompressed streams. +// It is intended for grpc internal use only. +const Identity = "identity" -// Compressor is used for compressing and decompressing when sending or receiving messages. +// Compressor is used for compressing and decompressing when sending or +// receiving messages. type Compressor interface { - // Compress writes the data written to wc to w after compressing it. If an error - // occurs while initializing the compressor, that error is returned instead. + // Compress writes the data written to wc to w after compressing it. If an + // error occurs while initializing the compressor, that error is returned + // instead. Compress(w io.Writer) (io.WriteCloser, error) - // Decompress reads data from r, decompresses it, and provides the uncompressed data - // via the returned io.Reader. If an error occurs while initializing the decompressor, that error - // is returned instead. + // Decompress reads data from r, decompresses it, and provides the + // uncompressed data via the returned io.Reader. If an error occurs while + // initializing the decompressor, that error is returned instead. Decompress(r io.Reader) (io.Reader, error) - // Name is the name of the compression codec and is used to set the content coding header. + // Name is the name of the compression codec and is used to set the content + // coding header. The result must be static; the result cannot change + // between calls. Name() string } -// RegisterCompressor registers the compressor with gRPC by its name. It can be activated when -// sending an RPC via grpc.UseCompressor(). It will be automatically accessed when receiving a -// message based on the content coding header. Servers also use it to send a response with the -// same encoding as the request. +var registeredCompressor = make(map[string]Compressor) + +// RegisterCompressor registers the compressor with gRPC by its name. It can +// be activated when sending an RPC via grpc.UseCompressor(). It will be +// automatically accessed when receiving a message based on the content coding +// header. Servers also use it to send a response with the same encoding as +// the request. // -// NOTE: this function must only be called during initialization time (i.e. in an init() function). If -// multiple Compressors are registered with the same name, the one registered last will take effect. +// NOTE: this function must only be called during initialization time (i.e. in +// an init() function), and is not thread-safe. If multiple Compressors are +// registered with the same name, the one registered last will take effect. func RegisterCompressor(c Compressor) { - registerCompressor[c.Name()] = c + registeredCompressor[c.Name()] = c } // GetCompressor returns Compressor for the given compressor name. func GetCompressor(name string) Compressor { - return registerCompressor[name] + return registeredCompressor[name] } -// Identity specifies the optional encoding for uncompressed streams. -// It is intended for grpc internal use only. -const Identity = "identity" +// Codec defines the interface gRPC uses to encode and decode messages. Note +// that implementations of this interface must be thread safe; a Codec's +// methods can be called from concurrent goroutines. +type Codec interface { + // Marshal returns the wire format of v. + Marshal(v interface{}) ([]byte, error) + // Unmarshal parses the wire format into v. + Unmarshal(data []byte, v interface{}) error + // Name returns the name of the Codec implementation. The returned string + // will be used as part of content type in transmission. The result must be + // static; the result cannot change between calls. + Name() string +} + +var registeredCodecs = make(map[string]Codec, 0) + +// RegisterCodec registers the provided Codec for use with all gRPC clients and +// servers. +// +// The Codec will be stored and looked up by result of its Name() method, which +// should match the content-subtype of the encoding handled by the Codec. This +// is case-insensitive, and is stored and looked up as lowercase. If the +// result of calling Name() is an empty string, RegisterCodec will panic. See +// Content-Type on +// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for +// more details. +// +// NOTE: this function must only be called during initialization time (i.e. in +// an init() function), and is not thread-safe. If multiple Compressors are +// registered with the same name, the one registered last will take effect. +func RegisterCodec(codec Codec) { + if codec == nil { + panic("cannot register a nil Codec") + } + contentSubtype := strings.ToLower(codec.Name()) + if contentSubtype == "" { + panic("cannot register Codec with empty string result for String()") + } + registeredCodecs[contentSubtype] = codec +} + +// GetCodec gets a registered Codec by content-subtype, or nil if no Codec is +// registered for the content-subtype. +// +// The content-subtype is expected to be lowercase. +func GetCodec(contentSubtype string) Codec { + return registeredCodecs[contentSubtype] +} diff --git a/vendor/google.golang.org/grpc/encoding/proto/proto.go b/vendor/google.golang.org/grpc/encoding/proto/proto.go new file mode 100644 index 00000000000..66b97a6f692 --- /dev/null +++ b/vendor/google.golang.org/grpc/encoding/proto/proto.go @@ -0,0 +1,110 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package proto defines the protobuf codec. Importing this package will +// register the codec. +package proto + +import ( + "math" + "sync" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/encoding" +) + +// Name is the name registered for the proto compressor. +const Name = "proto" + +func init() { + encoding.RegisterCodec(codec{}) +} + +// codec is a Codec implementation with protobuf. It is the default codec for gRPC. +type codec struct{} + +type cachedProtoBuffer struct { + lastMarshaledSize uint32 + proto.Buffer +} + +func capToMaxInt32(val int) uint32 { + if val > math.MaxInt32 { + return uint32(math.MaxInt32) + } + return uint32(val) +} + +func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) { + protoMsg := v.(proto.Message) + newSlice := make([]byte, 0, cb.lastMarshaledSize) + + cb.SetBuf(newSlice) + cb.Reset() + if err := cb.Marshal(protoMsg); err != nil { + return nil, err + } + out := cb.Bytes() + cb.lastMarshaledSize = capToMaxInt32(len(out)) + return out, nil +} + +func (codec) Marshal(v interface{}) ([]byte, error) { + if pm, ok := v.(proto.Marshaler); ok { + // object can marshal itself, no need for buffer + return pm.Marshal() + } + + cb := protoBufferPool.Get().(*cachedProtoBuffer) + out, err := marshal(v, cb) + + // put back buffer and lose the ref to the slice + cb.SetBuf(nil) + protoBufferPool.Put(cb) + return out, err +} + +func (codec) Unmarshal(data []byte, v interface{}) error { + protoMsg := v.(proto.Message) + protoMsg.Reset() + + if pu, ok := protoMsg.(proto.Unmarshaler); ok { + // object can unmarshal itself, no need for buffer + return pu.Unmarshal(data) + } + + cb := protoBufferPool.Get().(*cachedProtoBuffer) + cb.SetBuf(data) + err := cb.Unmarshal(protoMsg) + cb.SetBuf(nil) + protoBufferPool.Put(cb) + return err +} + +func (codec) Name() string { + return Name +} + +var protoBufferPool = &sync.Pool{ + New: func() interface{} { + return &cachedProtoBuffer{ + Buffer: proto.Buffer{}, + lastMarshaledSize: 16, + } + }, +} diff --git a/vendor/google.golang.org/grpc/go16.go b/vendor/google.golang.org/grpc/go16.go index f3dbf21700f..0ae4dbda9e6 100644 --- a/vendor/google.golang.org/grpc/go16.go +++ b/vendor/google.golang.org/grpc/go16.go @@ -48,6 +48,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro // toRPCErr converts an error into an error from the status package. func toRPCErr(err error) error { + if err == nil || err == io.EOF { + return err + } if _, ok := status.FromError(err); ok { return err } @@ -62,8 +65,6 @@ func toRPCErr(err error) error { return status.Error(codes.DeadlineExceeded, err.Error()) case context.Canceled: return status.Error(codes.Canceled, err.Error()) - case ErrClientConnClosing: - return status.Error(codes.FailedPrecondition, err.Error()) } } return status.Error(codes.Unknown, err.Error()) diff --git a/vendor/google.golang.org/grpc/go17.go b/vendor/google.golang.org/grpc/go17.go index de23098eb9a..53908828083 100644 --- a/vendor/google.golang.org/grpc/go17.go +++ b/vendor/google.golang.org/grpc/go17.go @@ -49,6 +49,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro // toRPCErr converts an error into an error from the status package. func toRPCErr(err error) error { + if err == nil || err == io.EOF { + return err + } if _, ok := status.FromError(err); ok { return err } @@ -63,8 +66,6 @@ func toRPCErr(err error) error { return status.Error(codes.DeadlineExceeded, err.Error()) case context.Canceled, netctx.Canceled: return status.Error(codes.Canceled, err.Error()) - case ErrClientConnClosing: - return status.Error(codes.FailedPrecondition, err.Error()) } } return status.Error(codes.Unknown, err.Error()) diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go index ccfea5d4530..15662b5d890 100644 --- a/vendor/google.golang.org/grpc/metadata/metadata.go +++ b/vendor/google.golang.org/grpc/metadata/metadata.go @@ -17,7 +17,8 @@ */ // Package metadata define the structure of the metadata supported by gRPC library. -// Please refer to https://grpc.io/docs/guides/wire.html for more information about custom-metadata. +// Please refer to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md +// for more information about custom-metadata. package metadata // import "google.golang.org/grpc/metadata" import ( @@ -115,9 +116,22 @@ func NewIncomingContext(ctx context.Context, md MD) context.Context { return context.WithValue(ctx, mdIncomingKey{}, md) } -// NewOutgoingContext creates a new context with outgoing md attached. +// NewOutgoingContext creates a new context with outgoing md attached. If used +// in conjunction with AppendToOutgoingContext, NewOutgoingContext will +// overwrite any previously-appended metadata. func NewOutgoingContext(ctx context.Context, md MD) context.Context { - return context.WithValue(ctx, mdOutgoingKey{}, md) + return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md}) +} + +// AppendToOutgoingContext returns a new context with the provided kv merged +// with any existing metadata in the context. Please refer to the +// documentation of Pairs for a description of kv. +func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context { + if len(kv)%2 == 1 { + panic(fmt.Sprintf("metadata: AppendToOutgoingContext got an odd number of input pairs for metadata: %d", len(kv))) + } + md, _ := ctx.Value(mdOutgoingKey{}).(rawMD) + return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: append(md.added, kv)}) } // FromIncomingContext returns the incoming metadata in ctx if it exists. The @@ -128,10 +142,39 @@ func FromIncomingContext(ctx context.Context) (md MD, ok bool) { return } +// FromOutgoingContextRaw returns the un-merged, intermediary contents +// of rawMD. Remember to perform strings.ToLower on the keys. The returned +// MD should not be modified. Writing to it may cause races. Modification +// should be made to copies of the returned MD. +// +// This is intended for gRPC-internal use ONLY. +func FromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) { + raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD) + if !ok { + return nil, nil, false + } + + return raw.md, raw.added, true +} + // FromOutgoingContext returns the outgoing metadata in ctx if it exists. The // returned MD should not be modified. Writing to it may cause races. // Modification should be made to the copies of the returned MD. -func FromOutgoingContext(ctx context.Context) (md MD, ok bool) { - md, ok = ctx.Value(mdOutgoingKey{}).(MD) - return +func FromOutgoingContext(ctx context.Context) (MD, bool) { + raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD) + if !ok { + return nil, false + } + + mds := make([]MD, 0, len(raw.added)+1) + mds = append(mds, raw.md) + for _, vv := range raw.added { + mds = append(mds, Pairs(vv...)) + } + return Join(mds...), ok +} + +type rawMD struct { + md MD + added [][]string } diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go index df097eedf79..775ee4d0d27 100644 --- a/vendor/google.golang.org/grpc/resolver/resolver.go +++ b/vendor/google.golang.org/grpc/resolver/resolver.go @@ -36,21 +36,12 @@ func Register(b Builder) { } // Get returns the resolver builder registered with the given scheme. -// If no builder is register with the scheme, the default scheme will -// be used. -// If the default scheme is not modified, "passthrough" will be the default -// scheme, and the preinstalled dns resolver will be used. -// If the default scheme is modified, and a resolver is registered with -// the scheme, that resolver will be returned. -// If the default scheme is modified, and no resolver is registered with -// the scheme, nil will be returned. +// +// If no builder is register with the scheme, nil will be returned. func Get(scheme string) Builder { if b, ok := m[scheme]; ok { return b } - if b, ok := m[defaultScheme]; ok { - return b - } return nil } @@ -60,6 +51,11 @@ func SetDefaultScheme(scheme string) { defaultScheme = scheme } +// GetDefaultScheme gets the default scheme that will be used. +func GetDefaultScheme() string { + return defaultScheme +} + // AddressType indicates the address type returned by name resolution. type AddressType uint8 @@ -90,9 +86,6 @@ type Address struct { // BuildOption includes additional information for the builder to create // the resolver. type BuildOption struct { - // UserOptions can be used to pass configuration between DialOptions and the - // resolver. - UserOptions interface{} } // ClientConn contains the callbacks for resolver to notify any updates diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go index ef5d4c28692..d394c5349db 100644 --- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go +++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go @@ -48,6 +48,9 @@ func split2(s, sep string) (string, string, bool) { // parseTarget splits target into a struct containing scheme, authority and // endpoint. +// +// If target is not a valid scheme://authority/endpoint, it returns {Endpoint: +// target}. func parseTarget(target string) (ret resolver.Target) { var ok bool ret.Scheme, ret.Endpoint, ok = split2(target, "://") @@ -65,14 +68,9 @@ func parseTarget(target string) (ret resolver.Target) { // If withResolverBuilder dial option is set, the specified resolver will be // used instead. func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { - grpclog.Infof("dialing to target with scheme: %q", cc.parsedTarget.Scheme) - rb := cc.dopts.resolverBuilder if rb == nil { - rb = resolver.Get(cc.parsedTarget.Scheme) - if rb == nil { - return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme) - } + return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme) } ccr := &ccResolverWrapper{ @@ -83,9 +81,7 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { } var err error - ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{ - UserOptions: cc.dopts.resolverBuildUserOptions, - }) + ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{}) if err != nil { return nil, err } diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index bf384b64489..58e6c538e5b 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -25,6 +25,7 @@ import ( "io" "io/ioutil" "math" + "strings" "sync" "time" @@ -32,6 +33,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" @@ -125,13 +127,13 @@ func (d *gzipDecompressor) Type() string { type callInfo struct { compressorType string failFast bool - headerMD metadata.MD - trailerMD metadata.MD - peer *peer.Peer + stream *transport.Stream traceInfo traceInfo // in trace.go maxReceiveMessageSize *int maxSendMessageSize *int creds credentials.PerRPCCredentials + contentSubtype string + codec baseCodec } func defaultCallInfo() *callInfo { @@ -172,7 +174,9 @@ func (o afterCall) after(c *callInfo) { o(c) } // for a unary RPC. func Header(md *metadata.MD) CallOption { return afterCall(func(c *callInfo) { - *md = c.headerMD + if c.stream != nil { + *md, _ = c.stream.Header() + } }) } @@ -180,16 +184,20 @@ func Header(md *metadata.MD) CallOption { // for a unary RPC. func Trailer(md *metadata.MD) CallOption { return afterCall(func(c *callInfo) { - *md = c.trailerMD + if c.stream != nil { + *md = c.stream.Trailer() + } }) } // Peer returns a CallOption that retrieves peer information for a // unary RPC. -func Peer(peer *peer.Peer) CallOption { +func Peer(p *peer.Peer) CallOption { return afterCall(func(c *callInfo) { - if c.peer != nil { - *peer = *c.peer + if c.stream != nil { + if x, ok := peer.FromContext(c.stream.Context()); ok { + *p = *x + } } }) } @@ -248,6 +256,49 @@ func UseCompressor(name string) CallOption { }) } +// CallContentSubtype returns a CallOption that will set the content-subtype +// for a call. For example, if content-subtype is "json", the Content-Type over +// the wire will be "application/grpc+json". The content-subtype is converted +// to lowercase before being included in Content-Type. See Content-Type on +// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for +// more details. +// +// If CallCustomCodec is not also used, the content-subtype will be used to +// look up the Codec to use in the registry controlled by RegisterCodec. See +// the documention on RegisterCodec for details on registration. The lookup +// of content-subtype is case-insensitive. If no such Codec is found, the call +// will result in an error with code codes.Internal. +// +// If CallCustomCodec is also used, that Codec will be used for all request and +// response messages, with the content-subtype set to the given contentSubtype +// here for requests. +func CallContentSubtype(contentSubtype string) CallOption { + contentSubtype = strings.ToLower(contentSubtype) + return beforeCall(func(c *callInfo) error { + c.contentSubtype = contentSubtype + return nil + }) +} + +// CallCustomCodec returns a CallOption that will set the given Codec to be +// used for all request and response messages for a call. The result of calling +// String() will be used as the content-subtype in a case-insensitive manner. +// +// See Content-Type on +// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for +// more details. Also see the documentation on RegisterCodec and +// CallContentSubtype for more details on the interaction between Codec and +// content-subtype. +// +// This function is provided for advanced users; prefer to use only +// CallContentSubtype to select a registered codec instead. +func CallCustomCodec(codec Codec) CallOption { + return beforeCall(func(c *callInfo) error { + c.codec = codec + return nil + }) +} + // The format of the payload: compressed or not? type payloadFormat uint8 @@ -263,8 +314,8 @@ type parser struct { // error types. r io.Reader - // The header of a gRPC message. Find more detail - // at https://grpc.io/docs/guides/wire.html. + // The header of a gRPC message. Find more detail at + // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md header [5]byte } @@ -313,7 +364,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt // encode serializes msg and returns a buffer of message header and a buffer of msg. // If msg is nil, it generates the message header and an empty msg buffer. // TODO(ddyihai): eliminate extra Compressor parameter. -func encode(c Codec, msg interface{}, cp Compressor, outPayload *stats.OutPayload, compressor encoding.Compressor) ([]byte, []byte, error) { +func encode(c baseCodec, msg interface{}, cp Compressor, outPayload *stats.OutPayload, compressor encoding.Compressor) ([]byte, []byte, error) { var ( b []byte cbuf *bytes.Buffer @@ -390,7 +441,7 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool // For the two compressor parameters, both should not be set, but if they are, // dc takes precedence over compressor. // TODO(dfawley): wrap the old compressor/decompressor using the new API? -func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error { +func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error { pf, d, err := p.recvMsg(maxReceiveMessageSize) if err != nil { return err @@ -485,6 +536,27 @@ func Errorf(c codes.Code, format string, a ...interface{}) error { return status.Errorf(c, format, a...) } +// setCallInfoCodec should only be called after CallOptions have been applied. +func setCallInfoCodec(c *callInfo) error { + if c.codec != nil { + // codec was already set by a CallOption; use it. + return nil + } + + if c.contentSubtype == "" { + // No codec specified in CallOptions; use proto by default. + c.codec = encoding.GetCodec(proto.Name) + return nil + } + + // c.contentSubtype is already lowercased in CallContentSubtype + c.codec = encoding.GetCodec(c.contentSubtype) + if c.codec == nil { + return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype) + } + return nil +} + // The SupportPackageIsVersion variables are referenced from generated protocol // buffer files to ensure compatibility with the gRPC version used. The latest // support package version is 5. @@ -500,6 +572,6 @@ const ( ) // Version is the current grpc version. -const Version = "1.9.1" +const Version = "1.10.1" const grpcUA = "grpc-go/" + Version diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index f651621685e..0f7ff5d6022 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -40,6 +40,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/keepalive" @@ -105,7 +106,7 @@ type Server struct { type options struct { creds credentials.TransportCredentials - codec Codec + codec baseCodec cp Compressor dc Decompressor unaryInt UnaryServerInterceptor @@ -182,6 +183,8 @@ func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { } // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. +// +// This will override any lookups by content-subtype for Codecs registered with RegisterCodec. func CustomCodec(codec Codec) ServerOption { return func(o *options) { o.codec = codec @@ -327,10 +330,6 @@ func NewServer(opt ...ServerOption) *Server { for _, o := range opt { o(&opts) } - if opts.codec == nil { - // Set the default codec. - opts.codec = protoCodec{} - } s := &Server{ lis: make(map[net.Listener]bool), opts: opts, @@ -695,7 +694,7 @@ func (s *Server) serveUsingHandler(conn net.Conn) { // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL // and subject to change. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - st, err := transport.NewServerHandlerTransport(w, r) + st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -759,7 +758,7 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str if s.opts.statsHandler != nil { outPayload = &stats.OutPayload{} } - hdr, data, err := encode(s.opts.codec, msg, cp, outPayload, comp) + hdr, data, err := encode(s.getCodec(stream.ContentSubtype()), msg, cp, outPayload, comp) if err != nil { grpclog.Errorln("grpc: server failed to encode response: ", err) return err @@ -904,7 +903,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. // java implementation. return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize) } - if err := s.opts.codec.Unmarshal(req, v); err != nil { + if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil { return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) } if inPayload != nil { @@ -996,7 +995,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp t: t, s: stream, p: &parser{r: stream}, - codec: s.opts.codec, + codec: s.getCodec(stream.ContentSubtype()), maxReceiveMessageSize: s.opts.maxReceiveMessageSize, maxSendMessageSize: s.opts.maxSendMessageSize, trInfo: trInfo, @@ -1262,6 +1261,22 @@ func init() { } } +// contentSubtype must be lowercase +// cannot return nil +func (s *Server) getCodec(contentSubtype string) baseCodec { + if s.opts.codec != nil { + return s.opts.codec + } + if contentSubtype == "" { + return encoding.GetCodec(proto.Name) + } + codec := encoding.GetCodec(contentSubtype) + if codec == nil { + return encoding.GetCodec(proto.Name) + } + return codec +} + // SetHeader sets the header metadata. // When called multiple times, all the provided metadata will be merged. // All the metadata will be sent out when one of the following happens: diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go index d9defaebcf5..3a42dc6de02 100644 --- a/vendor/google.golang.org/grpc/status/status.go +++ b/vendor/google.golang.org/grpc/status/status.go @@ -120,7 +120,8 @@ func FromProto(s *spb.Status) *Status { } // FromError returns a Status representing err if it was produced from this -// package, otherwise it returns nil, false. +// package. Otherwise, ok is false and a Status is returned with codes.Unknown +// and the original error message. func FromError(err error) (s *Status, ok bool) { if err == nil { return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true @@ -128,7 +129,14 @@ func FromError(err error) (s *Status, ok bool) { if se, ok := err.(*statusError); ok { return se.status(), true } - return nil, false + return New(codes.Unknown, err.Error()), false +} + +// Convert is a convenience function which removes the need to handle the +// boolean return value from FromError. +func Convert(err error) *Status { + s, _ := FromError(err) + return s } // WithDetails returns a new status with the provided details messages appended to the status. diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index f9138199510..deb73592728 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" "google.golang.org/grpc/transport" @@ -51,6 +50,8 @@ type StreamDesc struct { } // Stream defines the common interface a client or server stream has to satisfy. +// +// All errors returned from Stream are compatible with the status package. type Stream interface { // Context returns the context for this stream. Context() context.Context @@ -89,8 +90,9 @@ type ClientStream interface { // Stream.SendMsg() may return a non-nil error when something wrong happens sending // the request. The returned error indicates the status of this sending, not the final // status of the RPC. - // Always call Stream.RecvMsg() to get the final status if you care about the status of - // the RPC. + // + // Always call Stream.RecvMsg() to drain the stream and get the final + // status, otherwise there could be leaked resources. Stream } @@ -112,26 +114,28 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { - var ( - t transport.ClientTransport - s *transport.Stream - done func(balancer.DoneInfo) - cancel context.CancelFunc - ) c := defaultCallInfo() mc := cc.GetMethodConfig(method) if mc.WaitForReady != nil { c.failFast = !*mc.WaitForReady } + // Possible context leak: + // The cancel function for the child context we create will only be called + // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if + // an error is generated by SendMsg. + // https://github.com/grpc/grpc-go/issues/1818. + var cancel context.CancelFunc if mc.Timeout != nil && *mc.Timeout >= 0 { ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) - defer func() { - if err != nil { - cancel() - } - }() + } else { + ctx, cancel = context.WithCancel(ctx) } + defer func() { + if err != nil { + cancel() + } + }() opts = append(cc.dopts.callOptions, opts...) for _, o := range opts { @@ -141,6 +145,9 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) + if err := setCallInfoCodec(c); err != nil { + return nil, err + } callHdr := &transport.CallHdr{ Host: cc.authority, @@ -149,7 +156,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // so we don't flush the header. // If it's client streaming, the user may never send a request or send it any // time soon, so we ask the transport to flush the header. - Flush: desc.ClientStreams, + Flush: desc.ClientStreams, + ContentSubtype: c.contentSubtype, } // Set our outgoing compression according to the UseCompressor CallOption, if @@ -214,6 +222,11 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth }() } + var ( + t transport.ClientTransport + s *transport.Stream + done func(balancer.DoneInfo) + ) for { // Check to make sure the context has expired. This will prevent us from // looping forever if an error occurs for wait-for-ready RPCs where no data @@ -232,14 +245,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth s, err = t.NewStream(ctx, callHdr) if err != nil { if done != nil { - doneInfo := balancer.DoneInfo{Err: err} - if _, ok := err.(transport.ConnectionError); ok { - // If error is connection error, transport was sending data on wire, - // and we are not sure if anything has been sent on wire. - // If error is not connection error, we are sure nothing has been sent. - doneInfo.BytesSent = true - } - done(doneInfo) + done(balancer.DoneInfo{Err: err}) done = nil } // In the event of any error from NewStream, we never attempted to write @@ -253,15 +259,12 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth break } - // Set callInfo.peer object from stream's context. - if peer, ok := peer.FromContext(s.Context()); ok { - c.peer = peer - } + c.stream = s cs := &clientStream{ opts: opts, c: c, desc: desc, - codec: cc.dopts.codec, + codec: c.codec, cp: cp, dc: cc.dopts.dc, comp: comp, @@ -278,29 +281,21 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth statsCtx: ctx, statsHandler: cc.dopts.copts.StatsHandler, } - // Listen on s.Context().Done() to detect cancellation and s.Done() to detect - // normal termination when there is no pending I/O operations on this stream. - go func() { - select { - case <-t.Error(): - // Incur transport error, simply exit. - case <-cc.ctx.Done(): - cs.finish(ErrClientConnClosing) - cs.closeTransportStream(ErrClientConnClosing) - case <-s.Done(): - // TODO: The trace of the RPC is terminated here when there is no pending - // I/O, which is probably not the optimal solution. - cs.finish(s.Status().Err()) - cs.closeTransportStream(nil) - case <-s.GoAway(): - cs.finish(errConnDrain) - cs.closeTransportStream(errConnDrain) - case <-s.Context().Done(): - err := s.Context().Err() - cs.finish(err) - cs.closeTransportStream(transport.ContextErr(err)) - } - }() + if desc != unaryStreamDesc { + // Listen on cc and stream contexts to cleanup when the user closes the + // ClientConn or cancels the stream context. In all other cases, an error + // should already be injected into the recv buffer by the transport, which + // the client will eventually receive, and then we will cancel the stream's + // context in clientStream.finish. + go func() { + select { + case <-cc.ctx.Done(): + cs.finish(ErrClientConnClosing) + case <-ctx.Done(): + cs.finish(toRPCErr(s.Context().Err())) + } + }() + } return cs, nil } @@ -313,20 +308,22 @@ type clientStream struct { p *parser desc *StreamDesc - codec Codec + codec baseCodec cp Compressor dc Decompressor comp encoding.Compressor decomp encoding.Compressor decompSet bool + // cancel is only called when RecvMsg() returns non-nil error, which means + // the stream finishes with error or with io.EOF. cancel context.CancelFunc tracing bool // set to EnableTracing when the clientStream is created. mu sync.Mutex done func(balancer.DoneInfo) - closed bool + sentLast bool // sent an end stream finished bool // trInfo.tr is set when the clientStream is created (if EnableTracing is true), // and is set to nil when the clientStream's finish method is called. @@ -346,9 +343,8 @@ func (cs *clientStream) Context() context.Context { func (cs *clientStream) Header() (metadata.MD, error) { m, err := cs.s.Header() if err != nil { - if _, ok := err.(transport.ConnectionError); !ok { - cs.closeTransportStream(err) - } + err = toRPCErr(err) + cs.finish(err) } return m, err } @@ -358,6 +354,7 @@ func (cs *clientStream) Trailer() metadata.MD { } func (cs *clientStream) SendMsg(m interface{}) (err error) { + // TODO: Check cs.sentLast and error if we already ended the stream. if cs.tracing { cs.mu.Lock() if cs.trInfo.tr != nil { @@ -368,26 +365,18 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { // TODO Investigate how to signal the stats handling party. // generate error stats if err != nil && err != io.EOF? defer func() { - if err != nil { - cs.finish(err) - } - if err == nil { - return - } - if err == io.EOF { - // Specialize the process for server streaming. SendMsg is only called - // once when creating the stream object. io.EOF needs to be skipped when - // the rpc is early finished (before the stream object is created.). - // TODO: It is probably better to move this into the generated code. - if !cs.desc.ClientStreams && cs.desc.ServerStreams { - err = nil - } - return + // For non-client-streaming RPCs, we return nil instead of EOF on success + // because the generated code requires it. finish is not called; RecvMsg() + // will call it with the stream's status independently. + if err == io.EOF && !cs.desc.ClientStreams { + err = nil } - if _, ok := err.(transport.ConnectionError); !ok { - cs.closeTransportStream(err) + if err != nil && err != io.EOF { + // Call finish for errors generated by this SendMsg call. (Transport + // errors are converted to an io.EOF error below; the real error will be + // returned from RecvMsg eventually in that case.) + cs.finish(err) } - err = toRPCErr(err) }() var outPayload *stats.OutPayload if cs.statsHandler != nil { @@ -399,30 +388,36 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if err != nil { return err } - if cs.c.maxSendMessageSize == nil { - return status.Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") - } if len(data) > *cs.c.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize) } - err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: false}) - if err == nil && outPayload != nil { - outPayload.SentTime = time.Now() - cs.statsHandler.HandleRPC(cs.statsCtx, outPayload) + if !cs.desc.ClientStreams { + cs.sentLast = true } - return err + err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams}) + if err == nil { + if outPayload != nil { + outPayload.SentTime = time.Now() + cs.statsHandler.HandleRPC(cs.statsCtx, outPayload) + } + return nil + } + return io.EOF } func (cs *clientStream) RecvMsg(m interface{}) (err error) { + defer func() { + if err != nil || !cs.desc.ServerStreams { + // err != nil or non-server-streaming indicates end of stream. + cs.finish(err) + } + }() var inPayload *stats.InPayload if cs.statsHandler != nil { inPayload = &stats.InPayload{ Client: true, } } - if cs.c.maxReceiveMessageSize == nil { - return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") - } if !cs.decompSet { // Block until we receive headers containing received message encoding. if ct := cs.s.RecvCompress(); ct != "" && ct != encoding.Identity { @@ -440,98 +435,67 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { cs.decompSet = true } err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload, cs.decomp) - defer func() { - // err != nil indicates the termination of the stream. - if err != nil { - cs.finish(err) - } - }() - if err == nil { - if cs.tracing { - cs.mu.Lock() - if cs.trInfo.tr != nil { - cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) - } - cs.mu.Unlock() - } - if inPayload != nil { - cs.statsHandler.HandleRPC(cs.statsCtx, inPayload) - } - if !cs.desc.ClientStreams || cs.desc.ServerStreams { - return - } - // Special handling for client streaming rpc. - // This recv expects EOF or errors, so we don't collect inPayload. - if cs.c.maxReceiveMessageSize == nil { - return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") - } - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil, cs.decomp) - cs.closeTransportStream(err) - if err == nil { - return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) - } + if err != nil { if err == io.EOF { - if se := cs.s.Status().Err(); se != nil { - return se + if statusErr := cs.s.Status().Err(); statusErr != nil { + return statusErr } - cs.finish(err) - return nil + return io.EOF // indicates successful end of stream. } return toRPCErr(err) } - if _, ok := err.(transport.ConnectionError); !ok { - cs.closeTransportStream(err) - } - if err == io.EOF { - if statusErr := cs.s.Status().Err(); statusErr != nil { - return statusErr + if cs.tracing { + cs.mu.Lock() + if cs.trInfo.tr != nil { + cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) } - // Returns io.EOF to indicate the end of the stream. - return + cs.mu.Unlock() } - return toRPCErr(err) -} - -func (cs *clientStream) CloseSend() (err error) { - err = cs.t.Write(cs.s, nil, nil, &transport.Options{Last: true}) - defer func() { - if err != nil { - cs.finish(err) - } - }() - if err == nil || err == io.EOF { + if inPayload != nil { + cs.statsHandler.HandleRPC(cs.statsCtx, inPayload) + } + if cs.desc.ServerStreams { + // Subsequent messages should be received by subsequent RecvMsg calls. return nil } - if _, ok := err.(transport.ConnectionError); !ok { - cs.closeTransportStream(err) + + // Special handling for non-server-stream rpcs. + // This recv expects EOF or errors, so we don't collect inPayload. + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil, cs.decomp) + if err == nil { + return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) + } + if err == io.EOF { + return cs.s.Status().Err() // non-server streaming Recv returns nil on success } - err = toRPCErr(err) - return + return toRPCErr(err) } -func (cs *clientStream) closeTransportStream(err error) { - cs.mu.Lock() - if cs.closed { - cs.mu.Unlock() - return +func (cs *clientStream) CloseSend() error { + if cs.sentLast { + return nil } - cs.closed = true - cs.mu.Unlock() - cs.t.CloseStream(cs.s, err) + cs.sentLast = true + cs.t.Write(cs.s, nil, nil, &transport.Options{Last: true}) + // We ignore errors from Write and always return nil here. Any error it + // would return would also be returned by a subsequent RecvMsg call, and the + // user is supposed to always finish the stream by calling RecvMsg until it + // returns err != nil. + return nil } func (cs *clientStream) finish(err error) { + if err == io.EOF { + // Ending a stream with EOF indicates a success. + err = nil + } cs.mu.Lock() defer cs.mu.Unlock() if cs.finished { return } cs.finished = true - defer func() { - if cs.cancel != nil { - cs.cancel() - } - }() + cs.t.CloseStream(cs.s, err) for _, o := range cs.opts { o.after(cs.c) } @@ -547,18 +511,16 @@ func (cs *clientStream) finish(err error) { end := &stats.End{ Client: true, EndTime: time.Now(), - } - if err != io.EOF { - // end.Error is nil if the RPC finished successfully. - end.Error = toRPCErr(err) + Error: err, } cs.statsHandler.HandleRPC(cs.statsCtx, end) } + cs.cancel() if !cs.tracing { return } if cs.trInfo.tr != nil { - if err == nil || err == io.EOF { + if err == nil { cs.trInfo.tr.LazyPrintf("RPC: [OK]") } else { cs.trInfo.tr.LazyPrintf("RPC: [%v]", err) @@ -593,7 +555,7 @@ type serverStream struct { t transport.ServerTransport s *transport.Stream p *parser - codec Codec + codec baseCodec cp Compressor dc Decompressor diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go index 27c4ebb5f10..451d7e629df 100644 --- a/vendor/google.golang.org/grpc/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/transport/handler_server.go @@ -40,20 +40,24 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" + "google.golang.org/grpc/stats" "google.golang.org/grpc/status" ) // NewServerHandlerTransport returns a ServerTransport handling gRPC // from inside an http.Handler. It requires that the http Server // supports HTTP/2. -func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) { +func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) { if r.ProtoMajor != 2 { return nil, errors.New("gRPC requires HTTP/2") } if r.Method != "POST" { return nil, errors.New("invalid gRPC request method") } - if !validContentType(r.Header.Get("Content-Type")) { + contentType := r.Header.Get("Content-Type") + // TODO: do we assume contentType is lowercase? we did before + contentSubtype, validContentType := contentSubtype(contentType) + if !validContentType { return nil, errors.New("invalid gRPC request content-type") } if _, ok := w.(http.Flusher); !ok { @@ -64,10 +68,13 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr } st := &serverHandlerTransport{ - rw: w, - req: r, - closedCh: make(chan struct{}), - writes: make(chan func()), + rw: w, + req: r, + closedCh: make(chan struct{}), + writes: make(chan func()), + contentType: contentType, + contentSubtype: contentSubtype, + stats: stats, } if v := r.Header.Get("grpc-timeout"); v != "" { @@ -79,7 +86,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr st.timeout = to } - var metakv []string + metakv := []string{"content-type", contentType} if r.Host != "" { metakv = append(metakv, ":authority", r.Host) } @@ -126,6 +133,14 @@ type serverHandlerTransport struct { // block concurrent WriteStatus calls // e.g. grpc/(*serverStream).SendMsg/RecvMsg writeStatusMu sync.Mutex + + // we just mirror the request content-type + contentType string + // we store both contentType and contentSubtype so we don't keep recreating them + // TODO make sure this is consistent across handler_server and http2_server + contentSubtype string + + stats stats.Handler } func (ht *serverHandlerTransport) Close() error { @@ -219,6 +234,9 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro }) if err == nil { // transport has not been closed + if ht.stats != nil { + ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) + } ht.Close() close(ht.writes) } @@ -235,7 +253,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { h := ht.rw.Header() h["Date"] = nil // suppress Date to make tests happy; TODO: restore - h.Set("Content-Type", "application/grpc") + h.Set("Content-Type", ht.contentType) // Predeclare trailers we'll set later in WriteStatus (after the body). // This is a SHOULD in the HTTP RFC, and the way you add (known) @@ -263,7 +281,7 @@ func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts } func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { - return ht.do(func() { + err := ht.do(func() { ht.writeCommonHeaders(s) h := ht.rw.Header() for k, vv := range md { @@ -279,6 +297,13 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { ht.rw.WriteHeader(200) ht.rw.(http.Flusher).Flush() }) + + if err == nil { + if ht.stats != nil { + ht.stats.HandleRPC(s.Context(), &stats.OutHeader{}) + } + } + return err } func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) { @@ -313,13 +338,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace req := ht.req s := &Stream{ - id: 0, // irrelevant - requestRead: func(int) {}, - cancel: cancel, - buf: newRecvBuffer(), - st: ht, - method: req.URL.Path, - recvCompress: req.Header.Get("grpc-encoding"), + id: 0, // irrelevant + requestRead: func(int) {}, + cancel: cancel, + buf: newRecvBuffer(), + st: ht, + method: req.URL.Path, + recvCompress: req.Header.Get("grpc-encoding"), + contentSubtype: ht.contentSubtype, } pr := &peer.Peer{ Addr: ht.RemoteAddr(), @@ -330,6 +356,15 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace ctx = metadata.NewIncomingContext(ctx, ht.headerMD) ctx = peer.NewContext(ctx, pr) s.ctx = newContextWithStream(ctx, s) + if ht.stats != nil { + s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) + inHeader := &stats.InHeader{ + FullMethod: s.method, + RemoteAddr: ht.RemoteAddr(), + Compression: s.recvCompress, + } + ht.stats.HandleRPC(s.ctx, inHeader) + } s.trReader = &transportReader{ reader: &recvBufferReader{ctx: s.ctx, recv: s.buf}, windowHandler: func(int) {}, diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go index 4a122692af2..56b434ef37f 100644 --- a/vendor/google.golang.org/grpc/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/transport/http2_client.go @@ -314,15 +314,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &Stream{ - id: t.nextID, - done: make(chan struct{}), - goAway: make(chan struct{}), - method: callHdr.Method, - sendCompress: callHdr.SendCompress, - buf: newRecvBuffer(), - fc: &inFlow{limit: uint32(t.initialWindowSize)}, - sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), - headerChan: make(chan struct{}), + id: t.nextID, + done: make(chan struct{}), + goAway: make(chan struct{}), + method: callHdr.Method, + sendCompress: callHdr.SendCompress, + buf: newRecvBuffer(), + fc: &inFlow{limit: uint32(t.initialWindowSize)}, + sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), + headerChan: make(chan struct{}), + contentSubtype: callHdr.ContentSubtype, } t.nextID += 2 s.requestRead = func(n int) { @@ -380,7 +381,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea for _, c := range t.creds { data, err := c.GetRequestMetadata(ctx, audience) if err != nil { - return nil, streamErrorf(codes.Internal, "transport: %v", err) + if _, ok := status.FromError(err); ok { + return nil, err + } + + return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err) } for k, v := range data { // Capital header names are illegal in HTTP/2. @@ -434,7 +439,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme}) headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method}) headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) - headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) + headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)}) headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"}) @@ -459,7 +464,22 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if b := stats.OutgoingTrace(ctx); b != nil { headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)}) } - if md, ok := metadata.FromOutgoingContext(ctx); ok { + + if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { + var k string + for _, vv := range added { + for i, v := range vv { + if i%2 == 0 { + k = v + continue + } + // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. + if isReservedHeader(k) { + continue + } + headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)}) + } + } for k, vv := range md { // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. if isReservedHeader(k) { @@ -576,7 +596,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) { } s.state = streamDone s.mu.Unlock() - if _, ok := err.(StreamError); ok { + if err != nil && !rstStream { rstStream = true rstError = http2.ErrCodeCancel } @@ -645,6 +665,8 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e select { case <-s.ctx.Done(): return ContextErr(s.ctx.Err()) + case <-s.done: + return io.EOF case <-t.ctx.Done(): return ErrConnClosing default: @@ -694,6 +716,8 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e } ltq, _, err := t.localSendQuota.get(size, s.waiters) if err != nil { + // Add the acquired quota back to transport. + t.sendQuotaPool.add(tq) return err } // even if ltq is smaller than size we don't adjust size since @@ -1110,16 +1134,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { }() s.mu.Lock() - if !endStream { - s.recvCompress = state.encoding - } if !s.headerDone { - if !endStream && len(state.mdata) > 0 { - s.header = state.mdata + if !endStream { + // Headers frame is not actually a trailers-only frame. + isHeader = true + s.recvCompress = state.encoding + if len(state.mdata) > 0 { + s.header = state.mdata + } } close(s.headerChan) s.headerDone = true - isHeader = true } if !endStream || s.state == streamDone { s.mu.Unlock() diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go index 6d252c53a67..24c2c7e18c4 100644 --- a/vendor/google.golang.org/grpc/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/transport/http2_server.go @@ -281,12 +281,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( buf := newRecvBuffer() s := &Stream{ - id: streamID, - st: t, - buf: buf, - fc: &inFlow{limit: uint32(t.initialWindowSize)}, - recvCompress: state.encoding, - method: state.method, + id: streamID, + st: t, + buf: buf, + fc: &inFlow{limit: uint32(t.initialWindowSize)}, + recvCompress: state.encoding, + method: state.method, + contentSubtype: state.contentSubtype, } if frame.StreamEnded() { @@ -730,7 +731,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { // first and create a slice of that exact size. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) - headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) + headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) if s.sendCompress != "" { headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) } @@ -749,9 +750,9 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { endStream: false, }) if t.stats != nil { - outHeader := &stats.OutHeader{ - //WireLength: // TODO(mmukhi): Revisit this later, if needed. - } + // Note: WireLength is not set in outHeader. + // TODO(mmukhi): Revisit this later, if needed. + outHeader := &stats.OutHeader{} t.stats.HandleRPC(s.Context(), outHeader) } return nil @@ -792,7 +793,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. if !headersSent { headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) - headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) + headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) } headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) @@ -842,10 +843,6 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e var writeHeaderFrame bool s.mu.Lock() - if s.state == streamDone { - s.mu.Unlock() - return streamErrorf(codes.Unknown, "the stream has been done") - } if !s.headerOk { writeHeaderFrame = true } @@ -891,6 +888,8 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e } ltq, _, err := t.localSendQuota.get(size, s.waiters) if err != nil { + // Add the acquired quota back to transport. + t.sendQuotaPool.add(tq) return err } // even if ltq is smaller than size we don't adjust size since, diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go index 39f878cfd5b..34476773163 100644 --- a/vendor/google.golang.org/grpc/transport/http_util.go +++ b/vendor/google.golang.org/grpc/transport/http_util.go @@ -46,6 +46,12 @@ const ( // http2IOBufSize specifies the buffer size for sending frames. defaultWriteBufSize = 32 * 1024 defaultReadBufSize = 32 * 1024 + // baseContentType is the base content-type for gRPC. This is a valid + // content-type on it's own, but can also include a content-subtype such as + // "proto" as a suffix after "+" or ";". See + // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests + // for more details. + baseContentType = "application/grpc" ) var ( @@ -111,9 +117,10 @@ type decodeState struct { timeout time.Duration method string // key-value metadata map from the peer. - mdata map[string][]string - statsTags []byte - statsTrace []byte + mdata map[string][]string + statsTags []byte + statsTrace []byte + contentSubtype string } // isReservedHeader checks whether hdr belongs to HTTP2 headers @@ -149,17 +156,44 @@ func isWhitelistedPseudoHeader(hdr string) bool { } } -func validContentType(t string) bool { - e := "application/grpc" - if !strings.HasPrefix(t, e) { - return false +// contentSubtype returns the content-subtype for the given content-type. The +// given content-type must be a valid content-type that starts with +// "application/grpc". A content-subtype will follow "application/grpc" after a +// "+" or ";". See +// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for +// more details. +// +// If contentType is not a valid content-type for gRPC, the boolean +// will be false, otherwise true. If content-type == "application/grpc", +// "application/grpc+", or "application/grpc;", the boolean will be true, +// but no content-subtype will be returned. +// +// contentType is assumed to be lowercase already. +func contentSubtype(contentType string) (string, bool) { + if contentType == baseContentType { + return "", true + } + if !strings.HasPrefix(contentType, baseContentType) { + return "", false + } + // guaranteed since != baseContentType and has baseContentType prefix + switch contentType[len(baseContentType)] { + case '+', ';': + // this will return true for "application/grpc+" or "application/grpc;" + // which the previous validContentType function tested to be valid, so we + // just say that no content-subtype is specified in this case + return contentType[len(baseContentType)+1:], true + default: + return "", false } - // Support variations on the content-type - // (e.g. "application/grpc+blah", "application/grpc;blah"). - if len(t) > len(e) && t[len(e)] != '+' && t[len(e)] != ';' { - return false +} + +// contentSubtype is assumed to be lowercase +func contentType(contentSubtype string) string { + if contentSubtype == "" { + return baseContentType } - return true + return baseContentType + "+" + contentSubtype } func (d *decodeState) status() *status.Status { @@ -247,9 +281,16 @@ func (d *decodeState) addMetadata(k, v string) { func (d *decodeState) processHeaderField(f hpack.HeaderField) error { switch f.Name { case "content-type": - if !validContentType(f.Value) { + contentSubtype, validContentType := contentSubtype(f.Value) + if !validContentType { return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value) } + d.contentSubtype = contentSubtype + // TODO: do we want to propagate the whole content-type in the metadata, + // or come up with a way to just propagate the content-subtype if it was set? + // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} + // in the metadata? + d.addMetadata(f.Name, f.Value) case "grpc-encoding": d.encoding = f.Value case "grpc-status": diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go index 2e7bcaeaa27..e68f89ec459 100644 --- a/vendor/google.golang.org/grpc/transport/transport.go +++ b/vendor/google.golang.org/grpc/transport/transport.go @@ -246,6 +246,10 @@ type Stream struct { bytesReceived bool // indicates whether any bytes have been received on this stream unprocessed bool // set if the server sends a refused stream or GOAWAY including this stream + + // contentSubtype is the content-subtype for requests. + // this must be lowercase or the behavior is undefined. + contentSubtype string } func (s *Stream) waitOnHeader() error { @@ -321,6 +325,15 @@ func (s *Stream) ServerTransport() ServerTransport { return s.st } +// ContentSubtype returns the content-subtype for a request. For example, a +// content-subtype of "proto" will result in a content-type of +// "application/grpc+proto". This will always be lowercase. See +// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for +// more details. +func (s *Stream) ContentSubtype() string { + return s.contentSubtype +} + // Context returns the context of the stream. func (s *Stream) Context() context.Context { return s.ctx @@ -553,6 +566,14 @@ type CallHdr struct { // for performance purposes. // If it's false, new stream will never be flushed. Flush bool + + // ContentSubtype specifies the content-subtype for a request. For example, a + // content-subtype of "proto" will result in a content-type of + // "application/grpc+proto". The value of ContentSubtype must be all + // lowercase, otherwise the behavior is undefined. See + // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests + // for more details. + ContentSubtype string } // ClientTransport is the common interface for all gRPC client-side transport @@ -676,13 +697,13 @@ func (e ConnectionError) Origin() error { var ( // ErrConnClosing indicates that the transport is closing. ErrConnClosing = connectionErrorf(true, nil, "transport is closing") - // errStreamDrain indicates that the stream is rejected by the server because - // the server stops accepting new RPCs. - // TODO: delete this error; it is no longer necessary. - errStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs") + // errStreamDrain indicates that the stream is rejected because the + // connection is draining. This could be caused by goaway or balancer + // removing the address. + errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining") // StatusGoAway indicates that the server sent a GOAWAY that included this // stream's ID in unprocessed RPCs. - statusGoAway = status.New(codes.Unavailable, "the server stopped accepting new RPCs") + statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection") ) // TODO: See if we can replace StreamError with status package errors. diff --git a/vendor/vendor.json b/vendor/vendor.json index 567dcce623d..d75f48abf88 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -437,212 +437,220 @@ "versionExact": "master" }, { - "checksumSHA1": "LXTQppZOmpZb8/zNBzfXmq3GDEg=", + "checksumSHA1": "DGnsWyF+0V5UX3i9VVgKYZ8NwG0=", "path": "google.golang.org/grpc", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "xBhmO0Vn4kzbmySioX+2gBImrkk=", "path": "google.golang.org/grpc/balancer", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "CPWX/IgaQSR3+78j4sPrvHNkW+U=", "path": "google.golang.org/grpc/balancer/base", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "DJ1AtOk4Pu7bqtUMob95Hw8HPNw=", "path": "google.golang.org/grpc/balancer/roundrobin", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { - "checksumSHA1": "bfmh2m3qW8bb6qpfS/D4Wcl4hZE=", + "checksumSHA1": "j8Qs+yfgwYYOtodB/1bSlbzV5rs=", "path": "google.golang.org/grpc/codes", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "XH2WYcDNwVO47zYShREJjcYXm0Y=", "path": "google.golang.org/grpc/connectivity", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { - "checksumSHA1": "4DnDX81AOSyVP3UJ5tQmlNcG1MI=", + "checksumSHA1": "KthiDKNPHMeIu967enqtE4NaZzI=", "path": "google.golang.org/grpc/credentials", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { - "checksumSHA1": "9DImIDqmAMPO24loHJ77UVJTDxQ=", + "checksumSHA1": "mJTBJC0n9J2CV+tHX+dJosYOZmg=", "path": "google.golang.org/grpc/encoding", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" + }, + { + "checksumSHA1": "LKKkn7EYA+Do9Qwb2/SUKLFNxoo=", + "path": "google.golang.org/grpc/encoding/proto", + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "H7SuPUqbPcdbNqgl+k3ohuwMAwE=", "path": "google.golang.org/grpc/grpclb/grpc_lb_v1/messages", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "ntHev01vgZgeIh5VFRmbLx/BSTo=", "path": "google.golang.org/grpc/grpclog", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "DyM0uqLtknaI4THSc3spn9XlL+g=", "path": "google.golang.org/grpc/health", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "6vY7tYjV84pnr3sDctzx53Bs8b0=", "path": "google.golang.org/grpc/health/grpc_health_v1", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "Qvf3zdmRCSsiM/VoBv0qB/naHtU=", "path": "google.golang.org/grpc/internal", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "hcuHgKp8W0wIzoCnNfKI8NUss5o=", "path": "google.golang.org/grpc/keepalive", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { - "checksumSHA1": "KeUmTZV+2X46C49cKyjp+xM7fvw=", + "checksumSHA1": "X1BGbIb3xaxiAG4O1Ot5YjPlh4g=", "path": "google.golang.org/grpc/metadata", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "5dwF592DPvhF2Wcex3m7iV6aGRQ=", "path": "google.golang.org/grpc/naming", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "n5EgDdBqFMa2KQFhtl+FF/4gIFo=", "path": "google.golang.org/grpc/peer", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "JF/KBFCo5JwVtXfrZ2kJnFRC6W8=", "path": "google.golang.org/grpc/reflection", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "7Ax2K0St9CIi1rkA9Ju+2ERfe9E=", "path": "google.golang.org/grpc/reflection/grpc_reflection_v1alpha", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { - "checksumSHA1": "y8Ta+ctMP9CUTiPyPyxiD154d8w=", + "checksumSHA1": "qbA3XLvX0RTvaqQefvFDtE9GaJs=", "path": "google.golang.org/grpc/resolver", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "WpWF+bDzObsHf+bjoGpb/abeFxo=", "path": "google.golang.org/grpc/resolver/dns", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "zs9M4xE8Lyg4wvuYvR00XoBxmuw=", "path": "google.golang.org/grpc/resolver/passthrough", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "G9lgXNi7qClo5sM2s6TbTHLFR3g=", "path": "google.golang.org/grpc/stats", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { - "checksumSHA1": "tUo+M0Cb0W9ZEIt5BH30wJz/Kjc=", + "checksumSHA1": "/7i6dC0tFTtGMxykj9VduLEfBCU=", "path": "google.golang.org/grpc/status", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { "checksumSHA1": "qvArRhlrww5WvRmbyMF2mUfbJew=", "path": "google.golang.org/grpc/tap", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" }, { - "checksumSHA1": "4PldZ/0JjX6SpJYaMByY1ozywnY=", + "checksumSHA1": "fgt81mMAzx0Zo0ZuI2Vv0/RYApA=", "path": "google.golang.org/grpc/transport", - "revision": "7cea4cc846bcf00cbb27595b07da5de875ef7de9", - "revisionTime": "2018-01-08T22:01:35Z", - "version": "v1.9.1", - "versionExact": "v1.9.1" + "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", + "revisionTime": "2018-03-28T23:38:55Z", + "version": "v1.10", + "versionExact": "v1.10.1" } ], "rootPath": "gitlab.com/gitlab-org/gitaly" -- GitLab From 53b6d2cc34e956dbbf759ccc72db235c38009089 Mon Sep 17 00:00:00 2001 From: Ben Kochie Date: Thu, 1 Nov 2018 13:38:07 +0100 Subject: [PATCH 2/2] Update vendor google.golang.org/grpc/...@v1.11 --- vendor/google.golang.org/grpc/README.md | 3 +- vendor/google.golang.org/grpc/call.go | 23 +- vendor/google.golang.org/grpc/clientconn.go | 31 +- vendor/google.golang.org/grpc/go16.go | 29 -- vendor/google.golang.org/grpc/go17.go | 29 -- vendor/google.golang.org/grpc/interceptor.go | 4 +- .../grpc/metadata/metadata.go | 8 +- .../google.golang.org/grpc/picker_wrapper.go | 19 +- .../grpc/reflection/serverreflection.go | 223 ++++++++----- .../grpc/resolver_conn_wrapper.go | 5 +- vendor/google.golang.org/grpc/rpc_util.go | 258 +++++++++++---- vendor/google.golang.org/grpc/server.go | 84 +++-- vendor/google.golang.org/grpc/stats/stats.go | 2 + .../google.golang.org/grpc/status/status.go | 14 +- vendor/google.golang.org/grpc/stream.go | 293 +++++++++++------- .../grpc/transport/handler_server.go | 5 +- .../grpc/transport/http2_client.go | 19 +- .../grpc/transport/http2_server.go | 10 +- .../grpc/transport/http_util.go | 4 +- .../grpc/transport/transport.go | 23 +- vendor/vendor.json | 228 +++++++------- 21 files changed, 785 insertions(+), 529 deletions(-) diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md index 118327bb17a..789adfd6536 100644 --- a/vendor/google.golang.org/grpc/README.md +++ b/vendor/google.golang.org/grpc/README.md @@ -16,8 +16,7 @@ $ go get -u google.golang.org/grpc Prerequisites ------------- -This requires Go 1.6 or later. Go 1.7 will be required as of the next gRPC-Go -release (1.8). +This requires Go 1.6 or later. Go 1.7 will be required soon. Constraints ----------- diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index a66e3c2d958..f73b7d5528f 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go @@ -27,12 +27,31 @@ import ( // // All errors returned by Invoke are compatible with the status package. func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { + // allow interceptor to see all applicable call options, which means those + // configured as defaults from dial option as well as per-call options + opts = combine(cc.dopts.callOptions, opts) + if cc.dopts.unaryInt != nil { return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) } return invoke(ctx, method, args, reply, cc, opts...) } +func combine(o1 []CallOption, o2 []CallOption) []CallOption { + // we don't use append because o1 could have extra capacity whose + // elements would be overwritten, which could cause inadvertent + // sharing (and race connditions) between concurrent calls + if len(o1) == 0 { + return o2 + } else if len(o2) == 0 { + return o1 + } + ret := make([]CallOption, len(o1)+len(o2)) + copy(ret, o1) + copy(ret[len(o1):], o2) + return ret +} + // Invoke sends the RPC request on the wire and returns after response is // received. This is typically called by generated code. // @@ -54,7 +73,7 @@ func invoke(ctx context.Context, method string, req, reply interface{}, cc *Clie } cs := csInt.(*clientStream) if err := cs.SendMsg(req); err != nil { - if !cs.c.failFast && cs.s.Unprocessed() && firstAttempt { + if !cs.c.failFast && cs.attempt.s.Unprocessed() && firstAttempt { // TODO: Add a field to header for grpc-transparent-retry-attempts firstAttempt = false continue @@ -62,7 +81,7 @@ func invoke(ctx context.Context, method string, req, reply interface{}, cc *Clie return err } if err := cs.RecvMsg(reply); err != nil { - if !cs.c.failFast && cs.s.Unprocessed() && firstAttempt { + if !cs.c.failFast && cs.attempt.s.Unprocessed() && firstAttempt { // TODO: Add a field to header for grpc-transparent-retry-attempts firstAttempt = false continue diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 208e3c9b69d..6385407292a 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -45,6 +45,11 @@ import ( "google.golang.org/grpc/transport" ) +const ( + // minimum time to give a connection to complete + minConnectTimeout = 20 * time.Second +) + var ( // ErrClientConnClosing indicates that the operation is illegal because // the ClientConn is closing. @@ -60,8 +65,11 @@ var ( errConnUnavailable = errors.New("grpc: the connection is unavailable") // errBalancerClosed indicates that the balancer is closed. errBalancerClosed = errors.New("grpc: balancer is closed") - // minimum time to give a connection to complete - minConnectTimeout = 20 * time.Second + // We use an accessor so that minConnectTimeout can be + // atomically read and updated while testing. + getMinConnectTimeout = func() time.Duration { + return minConnectTimeout + } ) // The following errors are returned from Dial and DialContext @@ -435,7 +443,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if cc.dopts.copts.Dialer == nil { cc.dopts.copts.Dialer = newProxyDialer( func(ctx context.Context, addr string) (net.Conn, error) { - return dialContext(ctx, "tcp", addr) + network, addr := parseDialTarget(addr) + return dialContext(ctx, network, addr) }, ) } @@ -883,7 +892,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { // the corresponding MethodConfig. // If there isn't an exact match for the input method, we look for the default config // under the service (i.e /service/). If there is a default MethodConfig for -// the serivce, we return it. +// the service, we return it. // Otherwise, we return an empty MethodConfig. func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { // TODO: Avoid the locking here. @@ -944,7 +953,7 @@ func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { // Close tears down the ClientConn and all underlying connections. func (cc *ClientConn) Close() error { - cc.cancel() + defer cc.cancel() cc.mu.Lock() if cc.conns == nil { @@ -1073,7 +1082,7 @@ func (ac *addrConn) resetTransport() error { // connection. backoffFor := ac.dopts.bs.backoff(connectRetryNum) // time.Duration. // This will be the duration that dial gets to finish. - dialDuration := minConnectTimeout + dialDuration := getMinConnectTimeout() if backoffFor > dialDuration { // Give dial more time as we keep failing to connect. dialDuration = backoffFor @@ -1147,15 +1156,7 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt) if err != nil { cancel() - if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { - ac.mu.Lock() - if ac.state != connectivity.Shutdown { - ac.state = connectivity.TransientFailure - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - } - ac.mu.Unlock() - return false, err - } + ac.cc.blockingpicker.updateConnectionError(err) ac.mu.Lock() if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. diff --git a/vendor/google.golang.org/grpc/go16.go b/vendor/google.golang.org/grpc/go16.go index 0ae4dbda9e6..535ee9356f3 100644 --- a/vendor/google.golang.org/grpc/go16.go +++ b/vendor/google.golang.org/grpc/go16.go @@ -25,7 +25,6 @@ import ( "io" "net" "net/http" - "os" "golang.org/x/net/context" "google.golang.org/grpc/codes" @@ -69,31 +68,3 @@ func toRPCErr(err error) error { } return status.Error(codes.Unknown, err.Error()) } - -// convertCode converts a standard Go error into its canonical code. Note that -// this is only used to translate the error returned by the server applications. -func convertCode(err error) codes.Code { - switch err { - case nil: - return codes.OK - case io.EOF: - return codes.OutOfRange - case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF: - return codes.FailedPrecondition - case os.ErrInvalid: - return codes.InvalidArgument - case context.Canceled: - return codes.Canceled - case context.DeadlineExceeded: - return codes.DeadlineExceeded - } - switch { - case os.IsExist(err): - return codes.AlreadyExists - case os.IsNotExist(err): - return codes.NotFound - case os.IsPermission(err): - return codes.PermissionDenied - } - return codes.Unknown -} diff --git a/vendor/google.golang.org/grpc/go17.go b/vendor/google.golang.org/grpc/go17.go index 53908828083..ec676a93c39 100644 --- a/vendor/google.golang.org/grpc/go17.go +++ b/vendor/google.golang.org/grpc/go17.go @@ -26,7 +26,6 @@ import ( "io" "net" "net/http" - "os" netctx "golang.org/x/net/context" "google.golang.org/grpc/codes" @@ -70,31 +69,3 @@ func toRPCErr(err error) error { } return status.Error(codes.Unknown, err.Error()) } - -// convertCode converts a standard Go error into its canonical code. Note that -// this is only used to translate the error returned by the server applications. -func convertCode(err error) codes.Code { - switch err { - case nil: - return codes.OK - case io.EOF: - return codes.OutOfRange - case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF: - return codes.FailedPrecondition - case os.ErrInvalid: - return codes.InvalidArgument - case context.Canceled, netctx.Canceled: - return codes.Canceled - case context.DeadlineExceeded, netctx.DeadlineExceeded: - return codes.DeadlineExceeded - } - switch { - case os.IsExist(err): - return codes.AlreadyExists - case os.IsNotExist(err): - return codes.NotFound - case os.IsPermission(err): - return codes.PermissionDenied - } - return codes.Unknown -} diff --git a/vendor/google.golang.org/grpc/interceptor.go b/vendor/google.golang.org/grpc/interceptor.go index 06dc825b9fb..1f6ef678035 100644 --- a/vendor/google.golang.org/grpc/interceptor.go +++ b/vendor/google.golang.org/grpc/interceptor.go @@ -48,7 +48,9 @@ type UnaryServerInfo struct { } // UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal -// execution of a unary RPC. +// execution of a unary RPC. If a UnaryHandler returns an error, it should be produced by the +// status package, or else gRPC will use codes.Unknown as the status code and err.Error() as +// the status message of the RPC. type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error) // UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go index 15662b5d890..e7c994673c0 100644 --- a/vendor/google.golang.org/grpc/metadata/metadata.go +++ b/vendor/google.golang.org/grpc/metadata/metadata.go @@ -131,7 +131,11 @@ func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context panic(fmt.Sprintf("metadata: AppendToOutgoingContext got an odd number of input pairs for metadata: %d", len(kv))) } md, _ := ctx.Value(mdOutgoingKey{}).(rawMD) - return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: append(md.added, kv)}) + added := make([][]string, len(md.added)+1) + copy(added, md.added) + added[len(added)-1] = make([]string, len(kv)) + copy(added[len(added)-1], kv) + return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: added}) } // FromIncomingContext returns the incoming metadata in ctx if it exists. The @@ -159,7 +163,7 @@ func FromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) { // FromOutgoingContext returns the outgoing metadata in ctx if it exists. The // returned MD should not be modified. Writing to it may cause races. -// Modification should be made to the copies of the returned MD. +// Modification should be made to copies of the returned MD. func FromOutgoingContext(ctx context.Context) (MD, bool) { raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD) if !ok { diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go index db82bfb3a0f..4d0082593d1 100644 --- a/vendor/google.golang.org/grpc/picker_wrapper.go +++ b/vendor/google.golang.org/grpc/picker_wrapper.go @@ -36,6 +36,10 @@ type pickerWrapper struct { done bool blockingCh chan struct{} picker balancer.Picker + + // The latest connection happened. + connErrMu sync.Mutex + connErr error } func newPickerWrapper() *pickerWrapper { @@ -43,6 +47,19 @@ func newPickerWrapper() *pickerWrapper { return bp } +func (bp *pickerWrapper) updateConnectionError(err error) { + bp.connErrMu.Lock() + bp.connErr = err + bp.connErrMu.Unlock() +} + +func (bp *pickerWrapper) connectionError() error { + bp.connErrMu.Lock() + err := bp.connErr + bp.connErrMu.Unlock() + return err +} + // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. func (bp *pickerWrapper) updatePicker(p balancer.Picker) { bp.mu.Lock() @@ -107,7 +124,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer. if !failfast { continue } - return nil, nil, status.Errorf(codes.Unavailable, "%v", err) + return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError()) default: // err is some other error. return nil, nil, toRPCErr(err) diff --git a/vendor/google.golang.org/grpc/reflection/serverreflection.go b/vendor/google.golang.org/grpc/reflection/serverreflection.go index 1bfbf3e780c..dd22a2da784 100644 --- a/vendor/google.golang.org/grpc/reflection/serverreflection.go +++ b/vendor/google.golang.org/grpc/reflection/serverreflection.go @@ -45,7 +45,8 @@ import ( "io" "io/ioutil" "reflect" - "strings" + "sort" + "sync" "github.com/golang/protobuf/proto" dpb "github.com/golang/protobuf/protoc-gen-go/descriptor" @@ -57,8 +58,10 @@ import ( type serverReflectionServer struct { s *grpc.Server - // TODO add more cache if necessary - serviceInfo map[string]grpc.ServiceInfo // cache for s.GetServiceInfo() + + initSymbols sync.Once + serviceNames []string + symbols map[string]*dpb.FileDescriptorProto // map of fully-qualified names to files } // Register registers the server reflection service on the given gRPC server. @@ -76,6 +79,112 @@ type protoMessage interface { Descriptor() ([]byte, []int) } +func (s *serverReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) { + s.initSymbols.Do(func() { + serviceInfo := s.s.GetServiceInfo() + + s.symbols = map[string]*dpb.FileDescriptorProto{} + s.serviceNames = make([]string, 0, len(serviceInfo)) + processed := map[string]struct{}{} + for svc, info := range serviceInfo { + s.serviceNames = append(s.serviceNames, svc) + fdenc, ok := parseMetadata(info.Metadata) + if !ok { + continue + } + fd, err := decodeFileDesc(fdenc) + if err != nil { + continue + } + s.processFile(fd, processed) + } + sort.Strings(s.serviceNames) + }) + + return s.serviceNames, s.symbols +} + +func (s *serverReflectionServer) processFile(fd *dpb.FileDescriptorProto, processed map[string]struct{}) { + filename := fd.GetName() + if _, ok := processed[filename]; ok { + return + } + processed[filename] = struct{}{} + + prefix := fd.GetPackage() + + for _, msg := range fd.MessageType { + s.processMessage(fd, prefix, msg) + } + for _, en := range fd.EnumType { + s.processEnum(fd, prefix, en) + } + for _, ext := range fd.Extension { + s.processField(fd, prefix, ext) + } + for _, svc := range fd.Service { + svcName := fqn(prefix, svc.GetName()) + s.symbols[svcName] = fd + for _, meth := range svc.Method { + name := fqn(svcName, meth.GetName()) + s.symbols[name] = fd + } + } + + for _, dep := range fd.Dependency { + fdenc := proto.FileDescriptor(dep) + fdDep, err := decodeFileDesc(fdenc) + if err != nil { + continue + } + s.processFile(fdDep, processed) + } +} + +func (s *serverReflectionServer) processMessage(fd *dpb.FileDescriptorProto, prefix string, msg *dpb.DescriptorProto) { + msgName := fqn(prefix, msg.GetName()) + s.symbols[msgName] = fd + + for _, nested := range msg.NestedType { + s.processMessage(fd, msgName, nested) + } + for _, en := range msg.EnumType { + s.processEnum(fd, msgName, en) + } + for _, ext := range msg.Extension { + s.processField(fd, msgName, ext) + } + for _, fld := range msg.Field { + s.processField(fd, msgName, fld) + } + for _, oneof := range msg.OneofDecl { + oneofName := fqn(msgName, oneof.GetName()) + s.symbols[oneofName] = fd + } +} + +func (s *serverReflectionServer) processEnum(fd *dpb.FileDescriptorProto, prefix string, en *dpb.EnumDescriptorProto) { + enName := fqn(prefix, en.GetName()) + s.symbols[enName] = fd + + for _, val := range en.Value { + valName := fqn(enName, val.GetName()) + s.symbols[valName] = fd + } +} + +func (s *serverReflectionServer) processField(fd *dpb.FileDescriptorProto, prefix string, fld *dpb.FieldDescriptorProto) { + fldName := fqn(prefix, fld.GetName()) + s.symbols[fldName] = fd +} + +func fqn(prefix, name string) string { + if prefix == "" { + return name + } + return prefix + "." + name +} + // fileDescForType gets the file descriptor for the given type. // The given type should be a proto message. func (s *serverReflectionServer) fileDescForType(st reflect.Type) (*dpb.FileDescriptorProto, error) { @@ -85,12 +194,12 @@ func (s *serverReflectionServer) fileDescForType(st reflect.Type) (*dpb.FileDesc } enc, _ := m.Descriptor() - return s.decodeFileDesc(enc) + return decodeFileDesc(enc) } // decodeFileDesc does decompression and unmarshalling on the given // file descriptor byte slice. -func (s *serverReflectionServer) decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) { +func decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) { raw, err := decompress(enc) if err != nil { return nil, fmt.Errorf("failed to decompress enc: %v", err) @@ -116,7 +225,7 @@ func decompress(b []byte) ([]byte, error) { return out, nil } -func (s *serverReflectionServer) typeForName(name string) (reflect.Type, error) { +func typeForName(name string) (reflect.Type, error) { pt := proto.MessageType(name) if pt == nil { return nil, fmt.Errorf("unknown type: %q", name) @@ -126,7 +235,7 @@ func (s *serverReflectionServer) typeForName(name string) (reflect.Type, error) return st, nil } -func (s *serverReflectionServer) fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescriptorProto, error) { +func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescriptorProto, error) { m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message) if !ok { return nil, fmt.Errorf("failed to create message from type: %v", st) @@ -144,7 +253,7 @@ func (s *serverReflectionServer) fileDescContainingExtension(st reflect.Type, ex return nil, fmt.Errorf("failed to find registered extension for extension number %v", ext) } - return s.decodeFileDesc(proto.FileDescriptor(extDesc.Filename)) + return decodeFileDesc(proto.FileDescriptor(extDesc.Filename)) } func (s *serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) { @@ -168,53 +277,13 @@ func (s *serverReflectionServer) fileDescEncodingByFilename(name string) ([]byte if enc == nil { return nil, fmt.Errorf("unknown file: %v", name) } - fd, err := s.decodeFileDesc(enc) + fd, err := decodeFileDesc(enc) if err != nil { return nil, err } return proto.Marshal(fd) } -// serviceMetadataForSymbol finds the metadata for name in s.serviceInfo. -// name should be a service name or a method name. -func (s *serverReflectionServer) serviceMetadataForSymbol(name string) (interface{}, error) { - if s.serviceInfo == nil { - s.serviceInfo = s.s.GetServiceInfo() - } - - // Check if it's a service name. - if info, ok := s.serviceInfo[name]; ok { - return info.Metadata, nil - } - - // Check if it's a method name. - pos := strings.LastIndex(name, ".") - // Not a valid method name. - if pos == -1 { - return nil, fmt.Errorf("unknown symbol: %v", name) - } - - info, ok := s.serviceInfo[name[:pos]] - // Substring before last "." is not a service name. - if !ok { - return nil, fmt.Errorf("unknown symbol: %v", name) - } - - // Search the method name in info.Methods. - var found bool - for _, m := range info.Methods { - if m.Name == name[pos+1:] { - found = true - break - } - } - if found { - return info.Metadata, nil - } - - return nil, fmt.Errorf("unknown symbol: %v", name) -} - // parseMetadata finds the file descriptor bytes specified meta. // For SupportPackageIsVersion4, m is the name of the proto file, we // call proto.FileDescriptor to get the byte slice. @@ -237,33 +306,21 @@ func parseMetadata(meta interface{}) ([]byte, bool) { // does marshalling on it and returns the marshalled result. // The given symbol can be a type, a service or a method. func (s *serverReflectionServer) fileDescEncodingContainingSymbol(name string) ([]byte, error) { - var ( - fd *dpb.FileDescriptorProto - ) - // Check if it's a type name. - if st, err := s.typeForName(name); err == nil { - fd, err = s.fileDescForType(st) - if err != nil { - return nil, err - } - } else { // Check if it's a service name or a method name. - meta, err := s.serviceMetadataForSymbol(name) - - // Metadata not found. - if err != nil { - return nil, err - } - - // Metadata not valid. - enc, ok := parseMetadata(meta) - if !ok { - return nil, fmt.Errorf("invalid file descriptor for symbol: %v", name) + _, symbols := s.getSymbols() + fd := symbols[name] + if fd == nil { + // Check if it's a type name that was not present in the + // transitive dependencies of the registered services. + if st, err := typeForName(name); err == nil { + fd, err = s.fileDescForType(st) + if err != nil { + return nil, err + } } + } - fd, err = s.decodeFileDesc(enc) - if err != nil { - return nil, err - } + if fd == nil { + return nil, fmt.Errorf("unknown symbol: %v", name) } return proto.Marshal(fd) @@ -272,11 +329,11 @@ func (s *serverReflectionServer) fileDescEncodingContainingSymbol(name string) ( // fileDescEncodingContainingExtension finds the file descriptor containing given extension, // does marshalling on it and returns the marshalled result. func (s *serverReflectionServer) fileDescEncodingContainingExtension(typeName string, extNum int32) ([]byte, error) { - st, err := s.typeForName(typeName) + st, err := typeForName(typeName) if err != nil { return nil, err } - fd, err := s.fileDescContainingExtension(st, extNum) + fd, err := fileDescContainingExtension(st, extNum) if err != nil { return nil, err } @@ -285,7 +342,7 @@ func (s *serverReflectionServer) fileDescEncodingContainingExtension(typeName st // allExtensionNumbersForTypeName returns all extension numbers for the given type. func (s *serverReflectionServer) allExtensionNumbersForTypeName(name string) ([]int32, error) { - st, err := s.typeForName(name) + st, err := typeForName(name) if err != nil { return nil, err } @@ -374,14 +431,12 @@ func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflectio } } case *rpb.ServerReflectionRequest_ListServices: - if s.serviceInfo == nil { - s.serviceInfo = s.s.GetServiceInfo() - } - serviceResponses := make([]*rpb.ServiceResponse, 0, len(s.serviceInfo)) - for n := range s.serviceInfo { - serviceResponses = append(serviceResponses, &rpb.ServiceResponse{ + svcNames, _ := s.getSymbols() + serviceResponses := make([]*rpb.ServiceResponse, len(svcNames)) + for i, n := range svcNames { + serviceResponses[i] = &rpb.ServiceResponse{ Name: n, - }) + } } out.MessageResponse = &rpb.ServerReflectionResponse_ListServicesResponse{ ListServicesResponse: &rpb.ListServiceResponse{ diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go index d394c5349db..75b8ce1eb6c 100644 --- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go +++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go @@ -57,7 +57,10 @@ func parseTarget(target string) (ret resolver.Target) { if !ok { return resolver.Target{Endpoint: target} } - ret.Authority, ret.Endpoint, _ = split2(ret.Endpoint, "/") + ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/") + if !ok { + return resolver.Target{Endpoint: target} + } return ret } diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index 58e6c538e5b..00a99766066 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -22,9 +22,11 @@ import ( "bytes" "compress/gzip" "encoding/binary" + "fmt" "io" "io/ioutil" "math" + "net/url" "strings" "sync" "time" @@ -55,13 +57,29 @@ type gzipCompressor struct { // NewGZIPCompressor creates a Compressor based on GZIP. func NewGZIPCompressor() Compressor { + c, _ := NewGZIPCompressorWithLevel(gzip.DefaultCompression) + return c +} + +// NewGZIPCompressorWithLevel is like NewGZIPCompressor but specifies the gzip compression level instead +// of assuming DefaultCompression. +// +// The error returned will be nil if the level is valid. +func NewGZIPCompressorWithLevel(level int) (Compressor, error) { + if level < gzip.DefaultCompression || level > gzip.BestCompression { + return nil, fmt.Errorf("grpc: invalid compression level: %d", level) + } return &gzipCompressor{ pool: sync.Pool{ New: func() interface{} { - return gzip.NewWriter(ioutil.Discard) + w, err := gzip.NewWriterLevel(ioutil.Discard, level) + if err != nil { + panic(err) + } + return w }, }, - } + }, nil } func (c *gzipCompressor) Do(w io.Writer, p []byte) error { @@ -127,7 +145,7 @@ func (d *gzipDecompressor) Type() string { type callInfo struct { compressorType string failFast bool - stream *transport.Stream + stream *clientStream traceInfo traceInfo // in trace.go maxReceiveMessageSize *int maxSendMessageSize *int @@ -160,46 +178,66 @@ type EmptyCallOption struct{} func (EmptyCallOption) before(*callInfo) error { return nil } func (EmptyCallOption) after(*callInfo) {} -type beforeCall func(c *callInfo) error - -func (o beforeCall) before(c *callInfo) error { return o(c) } -func (o beforeCall) after(c *callInfo) {} - -type afterCall func(c *callInfo) - -func (o afterCall) before(c *callInfo) error { return nil } -func (o afterCall) after(c *callInfo) { o(c) } - // Header returns a CallOptions that retrieves the header metadata // for a unary RPC. func Header(md *metadata.MD) CallOption { - return afterCall(func(c *callInfo) { - if c.stream != nil { - *md, _ = c.stream.Header() - } - }) + return HeaderCallOption{HeaderAddr: md} +} + +// HeaderCallOption is a CallOption for collecting response header metadata. +// The metadata field will be populated *after* the RPC completes. +// This is an EXPERIMENTAL API. +type HeaderCallOption struct { + HeaderAddr *metadata.MD +} + +func (o HeaderCallOption) before(c *callInfo) error { return nil } +func (o HeaderCallOption) after(c *callInfo) { + if c.stream != nil { + *o.HeaderAddr, _ = c.stream.Header() + } } // Trailer returns a CallOptions that retrieves the trailer metadata // for a unary RPC. func Trailer(md *metadata.MD) CallOption { - return afterCall(func(c *callInfo) { - if c.stream != nil { - *md = c.stream.Trailer() - } - }) + return TrailerCallOption{TrailerAddr: md} +} + +// TrailerCallOption is a CallOption for collecting response trailer metadata. +// The metadata field will be populated *after* the RPC completes. +// This is an EXPERIMENTAL API. +type TrailerCallOption struct { + TrailerAddr *metadata.MD +} + +func (o TrailerCallOption) before(c *callInfo) error { return nil } +func (o TrailerCallOption) after(c *callInfo) { + if c.stream != nil { + *o.TrailerAddr = c.stream.Trailer() + } } // Peer returns a CallOption that retrieves peer information for a // unary RPC. func Peer(p *peer.Peer) CallOption { - return afterCall(func(c *callInfo) { - if c.stream != nil { - if x, ok := peer.FromContext(c.stream.Context()); ok { - *p = *x - } + return PeerCallOption{PeerAddr: p} +} + +// PeerCallOption is a CallOption for collecting the identity of the remote +// peer. The peer field will be populated *after* the RPC completes. +// This is an EXPERIMENTAL API. +type PeerCallOption struct { + PeerAddr *peer.Peer +} + +func (o PeerCallOption) before(c *callInfo) error { return nil } +func (o PeerCallOption) after(c *callInfo) { + if c.stream != nil { + if x, ok := peer.FromContext(c.stream.Context()); ok { + *o.PeerAddr = *x } - }) + } } // FailFast configures the action to take when an RPC is attempted on broken @@ -213,36 +251,76 @@ func Peer(p *peer.Peer) CallOption { // // By default, RPCs are "Fail Fast". func FailFast(failFast bool) CallOption { - return beforeCall(func(c *callInfo) error { - c.failFast = failFast - return nil - }) + return FailFastCallOption{FailFast: failFast} +} + +// FailFastCallOption is a CallOption for indicating whether an RPC should fail +// fast or not. +// This is an EXPERIMENTAL API. +type FailFastCallOption struct { + FailFast bool } +func (o FailFastCallOption) before(c *callInfo) error { + c.failFast = o.FailFast + return nil +} +func (o FailFastCallOption) after(c *callInfo) { return } + // MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive. func MaxCallRecvMsgSize(s int) CallOption { - return beforeCall(func(o *callInfo) error { - o.maxReceiveMessageSize = &s - return nil - }) + return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: s} +} + +// MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message +// size the client can receive. +// This is an EXPERIMENTAL API. +type MaxRecvMsgSizeCallOption struct { + MaxRecvMsgSize int } +func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error { + c.maxReceiveMessageSize = &o.MaxRecvMsgSize + return nil +} +func (o MaxRecvMsgSizeCallOption) after(c *callInfo) { return } + // MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send. func MaxCallSendMsgSize(s int) CallOption { - return beforeCall(func(o *callInfo) error { - o.maxSendMessageSize = &s - return nil - }) + return MaxSendMsgSizeCallOption{MaxSendMsgSize: s} } +// MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message +// size the client can send. +// This is an EXPERIMENTAL API. +type MaxSendMsgSizeCallOption struct { + MaxSendMsgSize int +} + +func (o MaxSendMsgSizeCallOption) before(c *callInfo) error { + c.maxSendMessageSize = &o.MaxSendMsgSize + return nil +} +func (o MaxSendMsgSizeCallOption) after(c *callInfo) { return } + // PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials // for a call. func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption { - return beforeCall(func(c *callInfo) error { - c.creds = creds - return nil - }) + return PerRPCCredsCallOption{Creds: creds} +} + +// PerRPCCredsCallOption is a CallOption that indicates the per-RPC +// credentials to use for the call. +// This is an EXPERIMENTAL API. +type PerRPCCredsCallOption struct { + Creds credentials.PerRPCCredentials +} + +func (o PerRPCCredsCallOption) before(c *callInfo) error { + c.creds = o.Creds + return nil } +func (o PerRPCCredsCallOption) after(c *callInfo) { return } // UseCompressor returns a CallOption which sets the compressor used when // sending the request. If WithCompressor is also set, UseCompressor has @@ -250,11 +328,20 @@ func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption { // // This API is EXPERIMENTAL. func UseCompressor(name string) CallOption { - return beforeCall(func(c *callInfo) error { - c.compressorType = name - return nil - }) + return CompressorCallOption{CompressorType: name} +} + +// CompressorCallOption is a CallOption that indicates the compressor to use. +// This is an EXPERIMENTAL API. +type CompressorCallOption struct { + CompressorType string +} + +func (o CompressorCallOption) before(c *callInfo) error { + c.compressorType = o.CompressorType + return nil } +func (o CompressorCallOption) after(c *callInfo) { return } // CallContentSubtype returns a CallOption that will set the content-subtype // for a call. For example, if content-subtype is "json", the Content-Type over @@ -273,13 +360,22 @@ func UseCompressor(name string) CallOption { // response messages, with the content-subtype set to the given contentSubtype // here for requests. func CallContentSubtype(contentSubtype string) CallOption { - contentSubtype = strings.ToLower(contentSubtype) - return beforeCall(func(c *callInfo) error { - c.contentSubtype = contentSubtype - return nil - }) + return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)} } +// ContentSubtypeCallOption is a CallOption that indicates the content-subtype +// used for marshaling messages. +// This is an EXPERIMENTAL API. +type ContentSubtypeCallOption struct { + ContentSubtype string +} + +func (o ContentSubtypeCallOption) before(c *callInfo) error { + c.contentSubtype = o.ContentSubtype + return nil +} +func (o ContentSubtypeCallOption) after(c *callInfo) { return } + // CallCustomCodec returns a CallOption that will set the given Codec to be // used for all request and response messages for a call. The result of calling // String() will be used as the content-subtype in a case-insensitive manner. @@ -293,12 +389,22 @@ func CallContentSubtype(contentSubtype string) CallOption { // This function is provided for advanced users; prefer to use only // CallContentSubtype to select a registered codec instead. func CallCustomCodec(codec Codec) CallOption { - return beforeCall(func(c *callInfo) error { - c.codec = codec - return nil - }) + return CustomCodecCallOption{Codec: codec} +} + +// CustomCodecCallOption is a CallOption that indicates the codec used for +// marshaling messages. +// This is an EXPERIMENTAL API. +type CustomCodecCallOption struct { + Codec Codec } +func (o CustomCodecCallOption) before(c *callInfo) error { + c.codec = o.Codec + return nil +} +func (o CustomCodecCallOption) after(c *callInfo) { return } + // The format of the payload: compressed or not? type payloadFormat uint8 @@ -557,6 +663,40 @@ func setCallInfoCodec(c *callInfo) error { return nil } +// parseDialTarget returns the network and address to pass to dialer +func parseDialTarget(target string) (net string, addr string) { + net = "tcp" + + m1 := strings.Index(target, ":") + m2 := strings.Index(target, ":/") + + // handle unix:addr which will fail with url.Parse + if m1 >= 0 && m2 < 0 { + if n := target[0:m1]; n == "unix" { + net = n + addr = target[m1+1:] + return net, addr + } + } + if m2 >= 0 { + t, err := url.Parse(target) + if err != nil { + return net, target + } + scheme := t.Scheme + addr = t.Path + if scheme == "unix" { + net = scheme + if addr == "" { + addr = t.Host + } + return net, addr + } + } + + return net, target +} + // The SupportPackageIsVersion variables are referenced from generated protocol // buffer files to ensure compatibility with the gRPC version used. The latest // support package version is 5. @@ -572,6 +712,6 @@ const ( ) // Version is the current grpc version. -const Version = "1.10.1" +const Version = "1.11.3" const grpcUA = "grpc-go/" + Version diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 0f7ff5d6022..c6b413b9d9d 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -777,13 +777,15 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { sh := s.opts.statsHandler if sh != nil { + beginTime := time.Now() begin := &stats.Begin{ - BeginTime: time.Now(), + BeginTime: beginTime, } sh.HandleRPC(stream.Context(), begin) defer func() { end := &stats.End{ - EndTime: time.Now(), + BeginTime: beginTime, + EndTime: time.Now(), } if err != nil && err != io.EOF { end.Error = toRPCErr(err) @@ -917,12 +919,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return nil } - reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt) + ctx := NewContextWithServerTransportStream(stream.Context(), stream) + reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) if appErr != nil { appStatus, ok := status.FromError(appErr) if !ok { // Convert appErr if it is not a grpc status error. - appErr = status.Error(convertCode(appErr), appErr.Error()) + appErr = status.Error(codes.Unknown, appErr.Error()) appStatus, _ = status.FromError(appErr) } if trInfo != nil { @@ -977,13 +980,15 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { sh := s.opts.statsHandler if sh != nil { + beginTime := time.Now() begin := &stats.Begin{ - BeginTime: time.Now(), + BeginTime: beginTime, } sh.HandleRPC(stream.Context(), begin) defer func() { end := &stats.End{ - EndTime: time.Now(), + BeginTime: beginTime, + EndTime: time.Now(), } if err != nil && err != io.EOF { end.Error = toRPCErr(err) @@ -991,7 +996,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp sh.HandleRPC(stream.Context(), end) }() } + ctx := NewContextWithServerTransportStream(stream.Context(), stream) ss := &serverStream{ + ctx: ctx, t: t, s: stream, p: &parser{r: stream}, @@ -1065,7 +1072,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp case transport.StreamError: appStatus = status.New(err.Code, err.Desc) default: - appStatus = status.New(convertCode(appErr), appErr.Error()) + appStatus = status.New(codes.Unknown, appErr.Error()) } appErr = appStatus.Err() } @@ -1085,7 +1092,6 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.mu.Unlock() } return t.WriteStatus(ss.s, status.New(codes.OK, "")) - } func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { @@ -1167,6 +1173,40 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str } } +// The key to save ServerTransportStream in the context. +type streamKey struct{} + +// NewContextWithServerTransportStream creates a new context from ctx and +// attaches stream to it. +// +// This API is EXPERIMENTAL. +func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { + return context.WithValue(ctx, streamKey{}, stream) +} + +// ServerTransportStream is a minimal interface that a transport stream must +// implement. This can be used to mock an actual transport stream for tests of +// handler code that use, for example, grpc.SetHeader (which requires some +// stream to be in context). +// +// See also NewContextWithServerTransportStream. +// +// This API is EXPERIMENTAL. +type ServerTransportStream interface { + Method() string + SetHeader(md metadata.MD) error + SendHeader(md metadata.MD) error + SetTrailer(md metadata.MD) error +} + +// serverStreamFromContext returns the server stream saved in ctx. Returns +// nil if the given context has no stream associated with it (which implies +// it is not an RPC invocation context). +func serverTransportStreamFromContext(ctx context.Context) ServerTransportStream { + s, _ := ctx.Value(streamKey{}).(ServerTransportStream) + return s +} + // Stop stops the gRPC server. It immediately closes all open // connections and listeners. // It cancels all active RPCs on the server side and the corresponding @@ -1287,8 +1327,8 @@ func SetHeader(ctx context.Context, md metadata.MD) error { if md.Len() == 0 { return nil } - stream, ok := transport.StreamFromContext(ctx) - if !ok { + stream := serverTransportStreamFromContext(ctx) + if stream == nil { return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) } return stream.SetHeader(md) @@ -1297,15 +1337,11 @@ func SetHeader(ctx context.Context, md metadata.MD) error { // SendHeader sends header metadata. It may be called at most once. // The provided md and headers set by SetHeader() will be sent. func SendHeader(ctx context.Context, md metadata.MD) error { - stream, ok := transport.StreamFromContext(ctx) - if !ok { + stream := serverTransportStreamFromContext(ctx) + if stream == nil { return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) } - t := stream.ServerTransport() - if t == nil { - grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream) - } - if err := t.WriteHeader(stream, md); err != nil { + if err := stream.SendHeader(md); err != nil { return toRPCErr(err) } return nil @@ -1317,9 +1353,19 @@ func SetTrailer(ctx context.Context, md metadata.MD) error { if md.Len() == 0 { return nil } - stream, ok := transport.StreamFromContext(ctx) - if !ok { + stream := serverTransportStreamFromContext(ctx) + if stream == nil { return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) } return stream.SetTrailer(md) } + +// Method returns the method string for the server context. The returned +// string is in the format of "/service/method". +func Method(ctx context.Context) (string, bool) { + s := serverTransportStreamFromContext(ctx) + if s == nil { + return "", false + } + return s.Method(), true +} diff --git a/vendor/google.golang.org/grpc/stats/stats.go b/vendor/google.golang.org/grpc/stats/stats.go index d5aa2f793bf..3f13190a0ac 100644 --- a/vendor/google.golang.org/grpc/stats/stats.go +++ b/vendor/google.golang.org/grpc/stats/stats.go @@ -169,6 +169,8 @@ func (s *OutTrailer) isRPCStats() {} type End struct { // Client is true if this End is from client side. Client bool + // BeginTime is the time when the RPC began. + BeginTime time.Time // EndTime is the time when the RPC ends. EndTime time.Time // Error is the error the RPC ended with. It is an error generated from diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go index 3a42dc6de02..9c61b094508 100644 --- a/vendor/google.golang.org/grpc/status/status.go +++ b/vendor/google.golang.org/grpc/status/status.go @@ -46,7 +46,7 @@ func (se *statusError) Error() string { return fmt.Sprintf("rpc error: code = %s desc = %s", codes.Code(p.GetCode()), p.GetMessage()) } -func (se *statusError) status() *Status { +func (se *statusError) GRPCStatus() *Status { return &Status{s: (*spb.Status)(se)} } @@ -120,14 +120,14 @@ func FromProto(s *spb.Status) *Status { } // FromError returns a Status representing err if it was produced from this -// package. Otherwise, ok is false and a Status is returned with codes.Unknown -// and the original error message. +// package or has a method `GRPCStatus() *Status`. Otherwise, ok is false and a +// Status is returned with codes.Unknown and the original error message. func FromError(err error) (s *Status, ok bool) { if err == nil { return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true } - if se, ok := err.(*statusError); ok { - return se.status(), true + if se, ok := err.(interface{ GRPCStatus() *Status }); ok { + return se.GRPCStatus(), true } return New(codes.Unknown, err.Error()), false } @@ -182,8 +182,8 @@ func Code(err error) codes.Code { if err == nil { return codes.OK } - if se, ok := err.(*statusError); ok { - return se.status().Code() + if se, ok := err.(interface{ GRPCStatus() *Status }); ok { + return se.GRPCStatus().Code() } return codes.Unknown } diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index deb73592728..75a4e8d45b7 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -36,7 +36,10 @@ import ( ) // StreamHandler defines the handler called by gRPC server to complete the -// execution of a streaming RPC. +// execution of a streaming RPC. If a StreamHandler returns an error, it +// should be produced by the status package, or else gRPC will use +// codes.Unknown as the status code and err.Error() as the status message +// of the RPC. type StreamHandler func(srv interface{}, stream ServerStream) error // StreamDesc represents a streaming RPC service's method specification. @@ -99,6 +102,10 @@ type ClientStream interface { // NewStream creates a new Stream for the client side. This is typically // called by generated code. func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { + // allow interceptor to see all applicable call options, which means those + // configured as defaults from dial option as well as per-call options + opts = combine(cc.dopts.callOptions, opts) + if cc.dopts.streamInt != nil { return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) } @@ -137,7 +144,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } }() - opts = append(cc.dopts.callOptions, opts...) for _, o := range opts { if err := o.before(c); err != nil { return nil, toRPCErr(err) @@ -202,11 +208,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } ctx = newContextWithRPCInfo(ctx, c.failFast) sh := cc.dopts.copts.StatsHandler + var beginTime time.Time if sh != nil { ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) + beginTime = time.Now() begin := &stats.Begin{ Client: true, - BeginTime: time.Now(), + BeginTime: beginTime, FailFast: c.failFast, } sh.HandleRPC(ctx, begin) @@ -214,8 +222,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if err != nil { // Only handle end stats if err != nil. end := &stats.End{ - Client: true, - Error: err, + Client: true, + Error: err, + BeginTime: beginTime, + EndTime: time.Now(), } sh.HandleRPC(ctx, end) } @@ -259,28 +269,28 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth break } - c.stream = s cs := &clientStream{ opts: opts, c: c, desc: desc, codec: c.codec, cp: cp, - dc: cc.dopts.dc, comp: comp, cancel: cancel, - - done: done, - t: t, - s: s, - p: &parser{r: s}, - - tracing: EnableTracing, - trInfo: trInfo, - - statsCtx: ctx, - statsHandler: cc.dopts.copts.StatsHandler, - } + attempt: &csAttempt{ + t: t, + s: s, + p: &parser{r: s}, + done: done, + dc: cc.dopts.dc, + ctx: ctx, + trInfo: trInfo, + statsHandler: sh, + beginTime: beginTime, + }, + } + cs.c.stream = cs + cs.attempt.cs = cs if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the // ClientConn or cancels the stream context. In all other cases, an error @@ -292,7 +302,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth case <-cc.ctx.Done(): cs.finish(ErrClientConnClosing) case <-ctx.Done(): - cs.finish(toRPCErr(s.Context().Err())) + cs.finish(toRPCErr(ctx.Err())) } }() } @@ -303,46 +313,56 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth type clientStream struct { opts []CallOption c *callInfo + desc *StreamDesc + + codec baseCodec + cp Compressor + comp encoding.Compressor + + cancel context.CancelFunc // cancels all attempts + + sentLast bool // sent an end stream + + mu sync.Mutex // guards finished + finished bool // TODO: replace with atomic cmpxchg or sync.Once? + + attempt *csAttempt // the active client stream attempt + // TODO(hedging): hedging will have multiple attempts simultaneously. +} + +// csAttempt implements a single transport stream attempt within a +// clientStream. +type csAttempt struct { + cs *clientStream t transport.ClientTransport s *transport.Stream p *parser - desc *StreamDesc + done func(balancer.DoneInfo) - codec baseCodec - cp Compressor dc Decompressor - comp encoding.Compressor decomp encoding.Compressor decompSet bool - // cancel is only called when RecvMsg() returns non-nil error, which means - // the stream finishes with error or with io.EOF. - cancel context.CancelFunc - - tracing bool // set to EnableTracing when the clientStream is created. + ctx context.Context // the application's context, wrapped by stats/tracing - mu sync.Mutex - done func(balancer.DoneInfo) - sentLast bool // sent an end stream - finished bool - // trInfo.tr is set when the clientStream is created (if EnableTracing is true), - // and is set to nil when the clientStream's finish method is called. + mu sync.Mutex // guards trInfo.tr + // trInfo.tr is set when created (if EnableTracing is true), + // and cleared when the finish method is called. trInfo traceInfo - // statsCtx keeps the user context for stats handling. - // All stats collection should use the statsCtx (instead of the stream context) - // so that all the generated stats for a particular RPC can be associated in the processing phase. - statsCtx context.Context statsHandler stats.Handler + beginTime time.Time } func (cs *clientStream) Context() context.Context { - return cs.s.Context() + // TODO(retry): commit the current attempt (the context has peer-aware data). + return cs.attempt.context() } func (cs *clientStream) Header() (metadata.MD, error) { - m, err := cs.s.Header() + m, err := cs.attempt.header() if err != nil { + // TODO(retry): maybe retry on error or commit attempt on success. err = toRPCErr(err) cs.finish(err) } @@ -350,20 +370,61 @@ func (cs *clientStream) Header() (metadata.MD, error) { } func (cs *clientStream) Trailer() metadata.MD { - return cs.s.Trailer() + // TODO(retry): on error, maybe retry (trailers-only). + return cs.attempt.trailer() } func (cs *clientStream) SendMsg(m interface{}) (err error) { - // TODO: Check cs.sentLast and error if we already ended the stream. - if cs.tracing { - cs.mu.Lock() - if cs.trInfo.tr != nil { - cs.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) - } + // TODO(retry): buffer message for replaying if not committed. + return cs.attempt.sendMsg(m) +} + +func (cs *clientStream) RecvMsg(m interface{}) (err error) { + // TODO(retry): maybe retry on error or commit attempt on success. + return cs.attempt.recvMsg(m) +} + +func (cs *clientStream) CloseSend() error { + cs.attempt.closeSend() + return nil +} + +func (cs *clientStream) finish(err error) { + if err == io.EOF { + // Ending a stream with EOF indicates a success. + err = nil + } + cs.mu.Lock() + if cs.finished { cs.mu.Unlock() + return + } + cs.finished = true + cs.mu.Unlock() + // TODO(retry): commit current attempt if necessary. + cs.attempt.finish(err) + for _, o := range cs.opts { + o.after(cs.c) } + cs.cancel() +} + +func (a *csAttempt) context() context.Context { + return a.s.Context() +} + +func (a *csAttempt) header() (metadata.MD, error) { + return a.s.Header() +} + +func (a *csAttempt) trailer() metadata.MD { + return a.s.Trailer() +} + +func (a *csAttempt) sendMsg(m interface{}) (err error) { // TODO Investigate how to signal the stats handling party. // generate error stats if err != nil && err != io.EOF? + cs := a.cs defer func() { // For non-client-streaming RPCs, we return nil instead of EOF on success // because the generated code requires it. finish is not called; RecvMsg() @@ -372,14 +433,23 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { err = nil } if err != nil && err != io.EOF { - // Call finish for errors generated by this SendMsg call. (Transport + // Call finish on the client stream for errors generated by this SendMsg + // call, as these indicate problems created by this client. (Transport // errors are converted to an io.EOF error below; the real error will be - // returned from RecvMsg eventually in that case.) + // returned from RecvMsg eventually in that case, or be retried.) cs.finish(err) } }() + // TODO: Check cs.sentLast and error if we already ended the stream. + if EnableTracing { + a.mu.Lock() + if a.trInfo.tr != nil { + a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) + } + a.mu.Unlock() + } var outPayload *stats.OutPayload - if cs.statsHandler != nil { + if a.statsHandler != nil { outPayload = &stats.OutPayload{ Client: true, } @@ -394,18 +464,19 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if !cs.desc.ClientStreams { cs.sentLast = true } - err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams}) + err = a.t.Write(a.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams}) if err == nil { if outPayload != nil { outPayload.SentTime = time.Now() - cs.statsHandler.HandleRPC(cs.statsCtx, outPayload) + a.statsHandler.HandleRPC(a.ctx, outPayload) } return nil } return io.EOF } -func (cs *clientStream) RecvMsg(m interface{}) (err error) { +func (a *csAttempt) recvMsg(m interface{}) (err error) { + cs := a.cs defer func() { if err != nil || !cs.desc.ServerStreams { // err != nil or non-server-streaming indicates end of stream. @@ -413,46 +484,46 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } }() var inPayload *stats.InPayload - if cs.statsHandler != nil { + if a.statsHandler != nil { inPayload = &stats.InPayload{ Client: true, } } - if !cs.decompSet { + if !a.decompSet { // Block until we receive headers containing received message encoding. - if ct := cs.s.RecvCompress(); ct != "" && ct != encoding.Identity { - if cs.dc == nil || cs.dc.Type() != ct { + if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { + if a.dc == nil || a.dc.Type() != ct { // No configured decompressor, or it does not match the incoming // message encoding; attempt to find a registered compressor that does. - cs.dc = nil - cs.decomp = encoding.GetCompressor(ct) + a.dc = nil + a.decomp = encoding.GetCompressor(ct) } } else { // No compression is used; disable our decompressor. - cs.dc = nil + a.dc = nil } // Only initialize this state once per stream. - cs.decompSet = true + a.decompSet = true } - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload, cs.decomp) + err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, inPayload, a.decomp) if err != nil { if err == io.EOF { - if statusErr := cs.s.Status().Err(); statusErr != nil { + if statusErr := a.s.Status().Err(); statusErr != nil { return statusErr } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } - if cs.tracing { - cs.mu.Lock() - if cs.trInfo.tr != nil { - cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) + if EnableTracing { + a.mu.Lock() + if a.trInfo.tr != nil { + a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) } - cs.mu.Unlock() + a.mu.Unlock() } if inPayload != nil { - cs.statsHandler.HandleRPC(cs.statsCtx, inPayload) + a.statsHandler.HandleRPC(a.ctx, inPayload) } if cs.desc.ServerStreams { // Subsequent messages should be received by subsequent RecvMsg calls. @@ -461,74 +532,59 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { // Special handling for non-server-stream rpcs. // This recv expects EOF or errors, so we don't collect inPayload. - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil, cs.decomp) + err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, nil, a.decomp) if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) } if err == io.EOF { - return cs.s.Status().Err() // non-server streaming Recv returns nil on success + return a.s.Status().Err() // non-server streaming Recv returns nil on success } return toRPCErr(err) } -func (cs *clientStream) CloseSend() error { +func (a *csAttempt) closeSend() { + cs := a.cs if cs.sentLast { - return nil + return } cs.sentLast = true - cs.t.Write(cs.s, nil, nil, &transport.Options{Last: true}) - // We ignore errors from Write and always return nil here. Any error it - // would return would also be returned by a subsequent RecvMsg call, and the - // user is supposed to always finish the stream by calling RecvMsg until it - // returns err != nil. - return nil + cs.attempt.t.Write(cs.attempt.s, nil, nil, &transport.Options{Last: true}) + // We ignore errors from Write. Any error it would return would also be + // returned by a subsequent RecvMsg call, and the user is supposed to always + // finish the stream by calling RecvMsg until it returns err != nil. } -func (cs *clientStream) finish(err error) { - if err == io.EOF { - // Ending a stream with EOF indicates a success. - err = nil - } - cs.mu.Lock() - defer cs.mu.Unlock() - if cs.finished { - return - } - cs.finished = true - cs.t.CloseStream(cs.s, err) - for _, o := range cs.opts { - o.after(cs.c) - } - if cs.done != nil { - cs.done(balancer.DoneInfo{ +func (a *csAttempt) finish(err error) { + a.mu.Lock() + a.t.CloseStream(a.s, err) + + if a.done != nil { + a.done(balancer.DoneInfo{ Err: err, BytesSent: true, - BytesReceived: cs.s.BytesReceived(), + BytesReceived: a.s.BytesReceived(), }) - cs.done = nil } - if cs.statsHandler != nil { + if a.statsHandler != nil { end := &stats.End{ - Client: true, - EndTime: time.Now(), - Error: err, + Client: true, + BeginTime: a.beginTime, + EndTime: time.Now(), + Error: err, } - cs.statsHandler.HandleRPC(cs.statsCtx, end) + a.statsHandler.HandleRPC(a.ctx, end) } - cs.cancel() - if !cs.tracing { - return - } - if cs.trInfo.tr != nil { + if a.trInfo.tr != nil { if err == nil { - cs.trInfo.tr.LazyPrintf("RPC: [OK]") + a.trInfo.tr.LazyPrintf("RPC: [OK]") } else { - cs.trInfo.tr.LazyPrintf("RPC: [%v]", err) - cs.trInfo.tr.SetError() + a.trInfo.tr.LazyPrintf("RPC: [%v]", err) + a.trInfo.tr.SetError() } - cs.trInfo.tr.Finish() - cs.trInfo.tr = nil + a.trInfo.tr.Finish() + a.trInfo.tr = nil } + a.mu.Unlock() } // ServerStream defines the interface a server stream has to satisfy. @@ -552,6 +608,7 @@ type ServerStream interface { // serverStream implements a server side Stream. type serverStream struct { + ctx context.Context t transport.ServerTransport s *transport.Stream p *parser @@ -572,7 +629,7 @@ type serverStream struct { } func (ss *serverStream) Context() context.Context { - return ss.s.Context() + return ss.ctx } func (ss *serverStream) SetHeader(md metadata.MD) error { @@ -675,9 +732,5 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { // MethodFromServerStream returns the method string for the input stream. // The returned string is in the format of "/service/method". func MethodFromServerStream(stream ServerStream) (string, bool) { - s, ok := transport.StreamFromContext(stream.Context()) - if !ok { - return "", ok - } - return s.Method(), ok + return Method(stream.Context()) } diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go index 451d7e629df..1a5e96c5a17 100644 --- a/vendor/google.golang.org/grpc/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/transport/handler_server.go @@ -98,7 +98,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta for _, v := range vv { v, err := decodeMetadataHeader(k, v) if err != nil { - return nil, streamErrorf(codes.InvalidArgument, "malformed binary metadata: %v", err) + return nil, streamErrorf(codes.Internal, "malformed binary metadata: %v", err) } metakv = append(metakv, k, v) } @@ -354,8 +354,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace pr.AuthInfo = credentials.TLSInfo{State: *req.TLS} } ctx = metadata.NewIncomingContext(ctx, ht.headerMD) - ctx = peer.NewContext(ctx, pr) - s.ctx = newContextWithStream(ctx, s) + s.ctx = peer.NewContext(ctx, pr) if ht.stats != nil { s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) inHeader := &stats.InHeader{ diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go index 56b434ef37f..8b5be0d6d51 100644 --- a/vendor/google.golang.org/grpc/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/transport/http2_client.go @@ -121,18 +121,6 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error } func isTemporary(err error) bool { - switch err { - case io.EOF: - // Connection closures may be resolved upon retry, and are thus - // treated as temporary. - return true - case context.DeadlineExceeded: - // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this - // special case is not needed. Until then, we need to keep this - // clause. - return true - } - switch err := err.(type) { case interface { Temporary() bool @@ -145,7 +133,7 @@ func isTemporary(err error) bool { // temporary. return err.Timeout() } - return false + return true } // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 @@ -181,10 +169,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne scheme = "https" conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Authority, conn) if err != nil { - // Credentials handshake errors are typically considered permanent - // to avoid retrying on e.g. bad certificates. - temp := isTemporary(err) - return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err) + return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err) } isSecure = true } diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go index 24c2c7e18c4..97b214c640e 100644 --- a/vendor/google.golang.org/grpc/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/transport/http2_server.go @@ -307,10 +307,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( pr.AuthInfo = t.authInfo } s.ctx = peer.NewContext(s.ctx, pr) - // Cache the current stream to the context so that the server application - // can find out. Required when the server wants to send some metadata - // back to the client (unary call only). - s.ctx = newContextWithStream(s.ctx, s) // Attach the received metadata to the context. if len(state.mdata) > 0 { s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata) @@ -896,9 +892,6 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e // ltq is only a soft limit. streamQuota -= size p := r[:size] - // Reset ping strikes when sending data since this might cause - // the peer to send ping. - atomic.StoreUint32(&t.resetPingStrikes, 1) success := func() { ltq := ltq t.controlBuf.put(&dataFrame{streamID: s.id, endStream: false, d: p, f: func() { @@ -1013,6 +1006,9 @@ var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} func (t *http2Server) itemHandler(i item) error { switch i := i.(type) { case *dataFrame: + // Reset ping strikes when sending data since this might cause + // the peer to send ping. + atomic.StoreUint32(&t.resetPingStrikes, 1) if err := t.framer.fr.WriteData(i.streamID, i.endStream, i.d); err != nil { return err } diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go index 34476773163..de37e38ec9f 100644 --- a/vendor/google.golang.org/grpc/transport/http_util.go +++ b/vendor/google.golang.org/grpc/transport/http_util.go @@ -70,7 +70,7 @@ var ( http2.ErrCodeConnect: codes.Internal, http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted, http2.ErrCodeInadequateSecurity: codes.PermissionDenied, - http2.ErrCodeHTTP11Required: codes.FailedPrecondition, + http2.ErrCodeHTTP11Required: codes.Internal, } statusCodeConvTab = map[codes.Code]http2.ErrCode{ codes.Internal: http2.ErrCodeInternal, @@ -283,7 +283,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { case "content-type": contentSubtype, validContentType := contentSubtype(f.Value) if !validContentType { - return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value) + return streamErrorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value) } d.contentSubtype = contentSubtype // TODO: do we want to propagate the whole content-type in the metadata, diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go index e68f89ec459..e0c1e343e7a 100644 --- a/vendor/google.golang.org/grpc/transport/transport.go +++ b/vendor/google.golang.org/grpc/transport/transport.go @@ -366,6 +366,14 @@ func (s *Stream) SetHeader(md metadata.MD) error { return nil } +// SendHeader sends the given header metadata. The given metadata is +// combined with any metadata set by previous calls to SetHeader and +// then written to the transport stream. +func (s *Stream) SendHeader(md metadata.MD) error { + t := s.ServerTransport() + return t.WriteHeader(s, md) +} + // SetTrailer sets the trailer metadata which will be sent with the RPC status // by the server. This can be called multiple times. Server side only. func (s *Stream) SetTrailer(md metadata.MD) error { @@ -445,21 +453,6 @@ func (s *Stream) GoString() string { return fmt.Sprintf("", s, s.method) } -// The key to save transport.Stream in the context. -type streamKey struct{} - -// newContextWithStream creates a new context from ctx and attaches stream -// to it. -func newContextWithStream(ctx context.Context, stream *Stream) context.Context { - return context.WithValue(ctx, streamKey{}, stream) -} - -// StreamFromContext returns the stream saved in ctx. -func StreamFromContext(ctx context.Context) (s *Stream, ok bool) { - s, ok = ctx.Value(streamKey{}).(*Stream) - return -} - // state of transport type transportState int diff --git a/vendor/vendor.json b/vendor/vendor.json index d75f48abf88..0ebbb29edb5 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -437,220 +437,220 @@ "versionExact": "master" }, { - "checksumSHA1": "DGnsWyF+0V5UX3i9VVgKYZ8NwG0=", + "checksumSHA1": "Qi3LcG4b9bGoC1W4Mhlks4DI3Ss=", "path": "google.golang.org/grpc", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "xBhmO0Vn4kzbmySioX+2gBImrkk=", "path": "google.golang.org/grpc/balancer", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "CPWX/IgaQSR3+78j4sPrvHNkW+U=", "path": "google.golang.org/grpc/balancer/base", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "DJ1AtOk4Pu7bqtUMob95Hw8HPNw=", "path": "google.golang.org/grpc/balancer/roundrobin", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "j8Qs+yfgwYYOtodB/1bSlbzV5rs=", "path": "google.golang.org/grpc/codes", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "XH2WYcDNwVO47zYShREJjcYXm0Y=", "path": "google.golang.org/grpc/connectivity", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "KthiDKNPHMeIu967enqtE4NaZzI=", "path": "google.golang.org/grpc/credentials", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "mJTBJC0n9J2CV+tHX+dJosYOZmg=", "path": "google.golang.org/grpc/encoding", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "LKKkn7EYA+Do9Qwb2/SUKLFNxoo=", "path": "google.golang.org/grpc/encoding/proto", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "H7SuPUqbPcdbNqgl+k3ohuwMAwE=", "path": "google.golang.org/grpc/grpclb/grpc_lb_v1/messages", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "ntHev01vgZgeIh5VFRmbLx/BSTo=", "path": "google.golang.org/grpc/grpclog", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "DyM0uqLtknaI4THSc3spn9XlL+g=", "path": "google.golang.org/grpc/health", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "6vY7tYjV84pnr3sDctzx53Bs8b0=", "path": "google.golang.org/grpc/health/grpc_health_v1", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "Qvf3zdmRCSsiM/VoBv0qB/naHtU=", "path": "google.golang.org/grpc/internal", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "hcuHgKp8W0wIzoCnNfKI8NUss5o=", "path": "google.golang.org/grpc/keepalive", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { - "checksumSHA1": "X1BGbIb3xaxiAG4O1Ot5YjPlh4g=", + "checksumSHA1": "RUgjR0iUFLCgdLAnNqiH+8jTzuk=", "path": "google.golang.org/grpc/metadata", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "5dwF592DPvhF2Wcex3m7iV6aGRQ=", "path": "google.golang.org/grpc/naming", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "n5EgDdBqFMa2KQFhtl+FF/4gIFo=", "path": "google.golang.org/grpc/peer", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { - "checksumSHA1": "JF/KBFCo5JwVtXfrZ2kJnFRC6W8=", + "checksumSHA1": "780k7ZcT5M32PTx7AmxkxMlZ/Wk=", "path": "google.golang.org/grpc/reflection", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "7Ax2K0St9CIi1rkA9Ju+2ERfe9E=", "path": "google.golang.org/grpc/reflection/grpc_reflection_v1alpha", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "qbA3XLvX0RTvaqQefvFDtE9GaJs=", "path": "google.golang.org/grpc/resolver", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "WpWF+bDzObsHf+bjoGpb/abeFxo=", "path": "google.golang.org/grpc/resolver/dns", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "zs9M4xE8Lyg4wvuYvR00XoBxmuw=", "path": "google.golang.org/grpc/resolver/passthrough", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { - "checksumSHA1": "G9lgXNi7qClo5sM2s6TbTHLFR3g=", + "checksumSHA1": "YclPgme2gT3S0hTkHVdE1zAxJdo=", "path": "google.golang.org/grpc/stats", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { - "checksumSHA1": "/7i6dC0tFTtGMxykj9VduLEfBCU=", + "checksumSHA1": "FXiovlBmrYdS4QT0Z4nV+x+v5HI=", "path": "google.golang.org/grpc/status", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { "checksumSHA1": "qvArRhlrww5WvRmbyMF2mUfbJew=", "path": "google.golang.org/grpc/tap", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" }, { - "checksumSHA1": "fgt81mMAzx0Zo0ZuI2Vv0/RYApA=", + "checksumSHA1": "sg7RY87LaWXaZMj0cuLQQaJJQYo=", "path": "google.golang.org/grpc/transport", - "revision": "61763f5bcf26276d4c255c89e9dcf8f9838a271b", - "revisionTime": "2018-03-28T23:38:55Z", - "version": "v1.10", - "versionExact": "v1.10.1" + "revision": "d11072e7ca9811b1100b80ca0269ac831f06d024", + "revisionTime": "2018-04-09T20:31:48Z", + "version": "v1.11", + "versionExact": "v1.11.3" } ], "rootPath": "gitlab.com/gitlab-org/gitaly" -- GitLab