interceptor.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package v3rpc
  15. import (
  16. "sync"
  17. "time"
  18. "github.com/coreos/etcd/etcdserver"
  19. "github.com/coreos/etcd/etcdserver/api"
  20. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  21. "github.com/coreos/etcd/pkg/types"
  22. "github.com/coreos/etcd/raft"
  23. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. "github.com/coreos/pkg/capnslog"
  25. prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  26. "golang.org/x/net/context"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/metadata"
  29. "google.golang.org/grpc/peer"
  30. )
  31. const (
  32. maxNoLeaderCnt = 3
  33. )
  34. type streamsMap struct {
  35. mu sync.Mutex
  36. streams map[grpc.ServerStream]struct{}
  37. }
  38. func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
  39. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  40. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  41. return nil, rpctypes.ErrGRPCNotCapable
  42. }
  43. md, ok := metadata.FromIncomingContext(ctx)
  44. if ok {
  45. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  46. if s.Leader() == types.ID(raft.None) {
  47. return nil, rpctypes.ErrGRPCNoLeader
  48. }
  49. }
  50. }
  51. return logUnaryInterceptor(ctx, req, info, handler)
  52. // interceptors are chained manually during backporting, for better readability refer to PR #9990
  53. }
  54. }
  55. // logUnaryInterceptor is a gRPC server-side interceptor that prints info on incoming requests for debugging purpose
  56. func logUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  57. startTime := time.Now()
  58. resp, err := prometheus.UnaryServerInterceptor(ctx, req, info, handler)
  59. // interceptors are chained manually during backporting, for better readability refer to PR #9990
  60. if plog.LevelAt(capnslog.DEBUG) {
  61. defer logUnaryRequestStats(ctx, info, startTime, req, resp)
  62. }
  63. return resp, err
  64. }
  65. func logUnaryRequestStats(ctx context.Context, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
  66. duration := time.Since(startTime)
  67. remote := "No remote client info."
  68. peerInfo, ok := peer.FromContext(ctx)
  69. if ok {
  70. remote = peerInfo.Addr.String()
  71. }
  72. var responseType string = info.FullMethod
  73. var reqCount, respCount int64
  74. var reqSize, respSize int
  75. var reqContent string
  76. switch _resp := resp.(type) {
  77. case *pb.RangeResponse:
  78. _req, ok := req.(*pb.RangeRequest)
  79. if ok {
  80. reqCount = 0
  81. reqSize = _req.Size()
  82. reqContent = _req.String()
  83. }
  84. if _resp != nil {
  85. respCount = _resp.GetCount()
  86. respSize = _resp.Size()
  87. }
  88. case *pb.PutResponse:
  89. _req, ok := req.(*pb.PutRequest)
  90. if ok {
  91. reqCount = 1
  92. reqSize = _req.Size()
  93. reqContent = pb.NewLoggablePutRequest(_req).String()
  94. // redact value field from request content, see PR #9821
  95. }
  96. if _resp != nil {
  97. respCount = 0
  98. respSize = _resp.Size()
  99. }
  100. case *pb.DeleteRangeResponse:
  101. _req, ok := req.(*pb.DeleteRangeRequest)
  102. if ok {
  103. reqCount = 0
  104. reqSize = _req.Size()
  105. reqContent = _req.String()
  106. }
  107. if _resp != nil {
  108. respCount = _resp.GetDeleted()
  109. respSize = _resp.Size()
  110. }
  111. case *pb.TxnResponse:
  112. _req, ok := req.(*pb.TxnRequest)
  113. if ok && _resp != nil {
  114. if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
  115. reqCount = int64(len(_req.GetSuccess()))
  116. reqSize = 0
  117. for _, r := range _req.GetSuccess() {
  118. reqSize += r.Size()
  119. }
  120. } else {
  121. reqCount = int64(len(_req.GetFailure()))
  122. reqSize = 0
  123. for _, r := range _req.GetFailure() {
  124. reqSize += r.Size()
  125. }
  126. }
  127. reqContent = pb.NewLoggableTxnRequest(_req).String()
  128. // redact value field from request content, see PR #9821
  129. }
  130. if _resp != nil {
  131. respCount = 0
  132. respSize = _resp.Size()
  133. }
  134. default:
  135. reqCount = -1
  136. reqSize = -1
  137. respCount = -1
  138. respSize = -1
  139. }
  140. logGenericRequestStats(startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
  141. }
  142. func logGenericRequestStats(startTime time.Time, duration time.Duration, remote string, responseType string,
  143. reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
  144. plog.Debugf("start time = %v, "+
  145. "time spent = %v, "+
  146. "remote = %s, "+
  147. "response type = %s, "+
  148. "request count = %d, "+
  149. "request size = %d, "+
  150. "response count = %d, "+
  151. "response size = %d, "+
  152. "request content = %s",
  153. startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
  154. )
  155. }
  156. func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
  157. smap := monitorLeader(s)
  158. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  159. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  160. return rpctypes.ErrGRPCNotCapable
  161. }
  162. md, ok := metadata.FromIncomingContext(ss.Context())
  163. if ok {
  164. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  165. if s.Leader() == types.ID(raft.None) {
  166. return rpctypes.ErrGRPCNoLeader
  167. }
  168. cctx, cancel := context.WithCancel(ss.Context())
  169. ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
  170. smap.mu.Lock()
  171. smap.streams[ss] = struct{}{}
  172. smap.mu.Unlock()
  173. defer func() {
  174. smap.mu.Lock()
  175. delete(smap.streams, ss)
  176. smap.mu.Unlock()
  177. cancel()
  178. }()
  179. }
  180. }
  181. return prometheus.StreamServerInterceptor(srv, ss, info, handler)
  182. }
  183. }
  184. type serverStreamWithCtx struct {
  185. grpc.ServerStream
  186. ctx context.Context
  187. cancel *context.CancelFunc
  188. }
  189. func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
  190. func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
  191. smap := &streamsMap{
  192. streams: make(map[grpc.ServerStream]struct{}),
  193. }
  194. go func() {
  195. election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
  196. noLeaderCnt := 0
  197. for {
  198. select {
  199. case <-s.StopNotify():
  200. return
  201. case <-time.After(election):
  202. if s.Leader() == types.ID(raft.None) {
  203. noLeaderCnt++
  204. } else {
  205. noLeaderCnt = 0
  206. }
  207. // We are more conservative on canceling existing streams. Reconnecting streams
  208. // cost much more than just rejecting new requests. So we wait until the member
  209. // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
  210. if noLeaderCnt >= maxNoLeaderCnt {
  211. smap.mu.Lock()
  212. for ss := range smap.streams {
  213. if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
  214. (*ssWithCtx.cancel)()
  215. <-ss.Context().Done()
  216. }
  217. }
  218. smap.streams = make(map[grpc.ServerStream]struct{})
  219. smap.mu.Unlock()
  220. }
  221. }
  222. }
  223. }()
  224. return smap
  225. }