interceptor.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package v3rpc
  15. import (
  16. "context"
  17. "sync"
  18. "time"
  19. "github.com/coreos/etcd/etcdserver"
  20. "github.com/coreos/etcd/etcdserver/api"
  21. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  22. "github.com/coreos/etcd/pkg/types"
  23. "github.com/coreos/etcd/raft"
  24. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  25. "go.uber.org/zap"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/metadata"
  28. "google.golang.org/grpc/peer"
  29. )
  30. const (
  31. maxNoLeaderCnt = 3
  32. )
  33. type streamsMap struct {
  34. mu sync.Mutex
  35. streams map[grpc.ServerStream]struct{}
  36. }
  37. func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
  38. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  39. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  40. return nil, rpctypes.ErrGRPCNotCapable
  41. }
  42. md, ok := metadata.FromIncomingContext(ctx)
  43. if ok {
  44. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  45. if s.Leader() == types.ID(raft.None) {
  46. return nil, rpctypes.ErrGRPCNoLeader
  47. }
  48. }
  49. }
  50. return handler(ctx, req)
  51. }
  52. }
  53. func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
  54. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  55. startTime := time.Now()
  56. resp, err := handler(ctx, req)
  57. defer logUnaryRequestStats(ctx, nil, info, startTime, req, resp)
  58. return resp, err
  59. }
  60. }
  61. func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
  62. duration := time.Since(startTime)
  63. remote := "No remote client info."
  64. peerInfo, ok := peer.FromContext(ctx)
  65. if ok {
  66. remote = peerInfo.Addr.String()
  67. }
  68. var responseType string = info.FullMethod
  69. var reqCount, respCount int64
  70. var reqSize, respSize int
  71. var reqContent string
  72. switch _resp := resp.(type) {
  73. case *pb.RangeResponse:
  74. _req, ok := req.(*pb.RangeRequest)
  75. if ok {
  76. reqCount = 0
  77. reqSize = _req.Size()
  78. reqContent = _req.String()
  79. }
  80. if _resp != nil {
  81. respCount = _resp.GetCount()
  82. respSize = _resp.Size()
  83. }
  84. case *pb.PutResponse:
  85. _req, ok := req.(*pb.PutRequest)
  86. if ok {
  87. reqCount = 1
  88. reqSize = _req.Size()
  89. reqContent = pb.NewLoggablePutRequest(_req).String()
  90. // redact value field from request content, see PR #9821
  91. }
  92. if _resp != nil {
  93. respCount = 0
  94. respSize = _resp.Size()
  95. }
  96. case *pb.DeleteRangeResponse:
  97. _req, ok := req.(*pb.DeleteRangeRequest)
  98. if ok {
  99. reqCount = 0
  100. reqSize = _req.Size()
  101. reqContent = _req.String()
  102. }
  103. if _resp != nil {
  104. respCount = _resp.GetDeleted()
  105. respSize = _resp.Size()
  106. }
  107. case *pb.TxnResponse:
  108. _req, ok := req.(*pb.TxnRequest)
  109. if ok && _resp != nil {
  110. if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
  111. reqCount = int64(len(_req.GetSuccess()))
  112. reqSize = 0
  113. for _, r := range _req.GetSuccess() {
  114. reqSize += r.Size()
  115. }
  116. } else {
  117. reqCount = int64(len(_req.GetFailure()))
  118. reqSize = 0
  119. for _, r := range _req.GetFailure() {
  120. reqSize += r.Size()
  121. }
  122. }
  123. reqContent = pb.NewLoggableTxnRequest(_req).String()
  124. // redact value field from request content, see PR #9821
  125. }
  126. if _resp != nil {
  127. respCount = 0
  128. respSize = _resp.Size()
  129. }
  130. default:
  131. reqCount = -1
  132. reqSize = -1
  133. respCount = -1
  134. respSize = -1
  135. }
  136. logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
  137. }
  138. func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
  139. reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
  140. if lg == nil {
  141. plog.Debugf("start time = %v, "+
  142. "time spent = %v, "+
  143. "remote = %s, "+
  144. "response type = %s, "+
  145. "request count = %d, "+
  146. "request size = %d, "+
  147. "response count = %d, "+
  148. "response size = %d, "+
  149. "request content = %s",
  150. startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
  151. )
  152. } else {
  153. lg.Debug("request stats",
  154. zap.Time("start time", startTime),
  155. zap.Duration("time spent", duration),
  156. zap.String("remote", remote),
  157. zap.String("response type", responseType),
  158. zap.Int64("request count", reqCount),
  159. zap.Int("request size", reqSize),
  160. zap.Int64("response count", respCount),
  161. zap.Int("response size", respSize),
  162. zap.String("request content", reqContent),
  163. )
  164. }
  165. }
  166. func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
  167. smap := monitorLeader(s)
  168. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  169. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  170. return rpctypes.ErrGRPCNotCapable
  171. }
  172. md, ok := metadata.FromIncomingContext(ss.Context())
  173. if ok {
  174. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  175. if s.Leader() == types.ID(raft.None) {
  176. return rpctypes.ErrGRPCNoLeader
  177. }
  178. cctx, cancel := context.WithCancel(ss.Context())
  179. ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
  180. smap.mu.Lock()
  181. smap.streams[ss] = struct{}{}
  182. smap.mu.Unlock()
  183. defer func() {
  184. smap.mu.Lock()
  185. delete(smap.streams, ss)
  186. smap.mu.Unlock()
  187. cancel()
  188. }()
  189. }
  190. }
  191. return handler(srv, ss)
  192. }
  193. }
  194. type serverStreamWithCtx struct {
  195. grpc.ServerStream
  196. ctx context.Context
  197. cancel *context.CancelFunc
  198. }
  199. func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
  200. func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
  201. smap := &streamsMap{
  202. streams: make(map[grpc.ServerStream]struct{}),
  203. }
  204. go func() {
  205. election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
  206. noLeaderCnt := 0
  207. for {
  208. select {
  209. case <-s.StopNotify():
  210. return
  211. case <-time.After(election):
  212. if s.Leader() == types.ID(raft.None) {
  213. noLeaderCnt++
  214. } else {
  215. noLeaderCnt = 0
  216. }
  217. // We are more conservative on canceling existing streams. Reconnecting streams
  218. // cost much more than just rejecting new requests. So we wait until the member
  219. // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
  220. if noLeaderCnt >= maxNoLeaderCnt {
  221. smap.mu.Lock()
  222. for ss := range smap.streams {
  223. if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
  224. (*ssWithCtx.cancel)()
  225. <-ss.Context().Done()
  226. }
  227. }
  228. smap.streams = make(map[grpc.ServerStream]struct{})
  229. smap.mu.Unlock()
  230. }
  231. }
  232. }
  233. }()
  234. return smap
  235. }