interceptor.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package v3rpc
  15. import (
  16. "context"
  17. "sync"
  18. "time"
  19. "github.com/coreos/etcd/etcdserver"
  20. "github.com/coreos/etcd/etcdserver/api"
  21. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  22. "github.com/coreos/etcd/pkg/types"
  23. "github.com/coreos/etcd/raft"
  24. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  25. "go.uber.org/zap"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/metadata"
  28. "google.golang.org/grpc/peer"
  29. )
  30. const (
  31. maxNoLeaderCnt = 3
  32. )
  33. type streamsMap struct {
  34. mu sync.Mutex
  35. streams map[grpc.ServerStream]struct{}
  36. }
  37. func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
  38. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  39. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  40. return nil, rpctypes.ErrGRPCNotCapable
  41. }
  42. md, ok := metadata.FromIncomingContext(ctx)
  43. if ok {
  44. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  45. if s.Leader() == types.ID(raft.None) {
  46. return nil, rpctypes.ErrGRPCNoLeader
  47. }
  48. }
  49. }
  50. resp, err := handler(ctx, req)
  51. return resp, err
  52. }
  53. }
  54. func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
  55. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  56. startTime := time.Now()
  57. resp, err := handler(ctx, req)
  58. defer logUnaryRequestStats(ctx, nil, info, startTime, req, resp)
  59. return resp, err
  60. }
  61. }
  62. func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
  63. duration := time.Since(startTime)
  64. remote := "No remote client info."
  65. peerInfo, ok := peer.FromContext(ctx)
  66. if ok {
  67. remote = peerInfo.Addr.String()
  68. }
  69. var responseType string = info.FullMethod
  70. var reqCount, respCount int64
  71. var reqSize, respSize int
  72. var reqContent string
  73. switch _resp := resp.(type) {
  74. case *pb.RangeResponse:
  75. _req, ok := req.(*pb.RangeRequest)
  76. if ok {
  77. reqCount = 0
  78. reqSize = _req.Size()
  79. reqContent = _req.String()
  80. }
  81. if _resp != nil {
  82. respCount = _resp.GetCount()
  83. respSize = _resp.Size()
  84. }
  85. case *pb.PutResponse:
  86. _req, ok := req.(*pb.PutRequest)
  87. if ok {
  88. reqCount = 1
  89. reqSize = _req.Size()
  90. reqContent = pb.NewLoggablePutRequest(_req).String()
  91. // redact value field from request content, see PR #9821
  92. }
  93. if _resp != nil {
  94. respCount = 0
  95. respSize = _resp.Size()
  96. }
  97. case *pb.DeleteRangeResponse:
  98. _req, ok := req.(*pb.DeleteRangeRequest)
  99. if ok {
  100. reqCount = 0
  101. reqSize = _req.Size()
  102. reqContent = _req.String()
  103. }
  104. if _resp != nil {
  105. respCount = _resp.GetDeleted()
  106. respSize = _resp.Size()
  107. }
  108. case *pb.TxnResponse:
  109. _req, ok := req.(*pb.TxnRequest)
  110. if ok && _resp != nil {
  111. if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
  112. reqCount = int64(len(_req.GetSuccess()))
  113. reqSize = 0
  114. for _, r := range _req.GetSuccess() {
  115. reqSize += r.Size()
  116. }
  117. } else {
  118. reqCount = int64(len(_req.GetFailure()))
  119. reqSize = 0
  120. for _, r := range _req.GetFailure() {
  121. reqSize += r.Size()
  122. }
  123. }
  124. reqContent = pb.NewLoggableTxnRequest(_req).String()
  125. // redact value field from request content, see PR #9821
  126. }
  127. if _resp != nil {
  128. respCount = 0
  129. respSize = _resp.Size()
  130. }
  131. default:
  132. reqCount = -1
  133. reqSize = -1
  134. respCount = -1
  135. respSize = -1
  136. }
  137. logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
  138. }
  139. func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
  140. reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
  141. if lg == nil {
  142. plog.Debugf("start time = %v, "+
  143. "time spent = %v, "+
  144. "remote = %s, "+
  145. "response type = %s, "+
  146. "request count = %d, "+
  147. "request size = %d, "+
  148. "response count = %d, "+
  149. "response size = %d, "+
  150. "request content = %s",
  151. startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
  152. )
  153. } else {
  154. lg.Debug("request stats",
  155. zap.Time("start time", startTime),
  156. zap.Duration("time spent", duration),
  157. zap.String("remote", remote),
  158. zap.String("response type", responseType),
  159. zap.Int64("request count", reqCount),
  160. zap.Int("request size", reqSize),
  161. zap.Int64("response count", respCount),
  162. zap.Int("response size", respSize),
  163. zap.String("request content", reqContent),
  164. )
  165. }
  166. }
  167. func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
  168. smap := monitorLeader(s)
  169. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  170. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  171. return rpctypes.ErrGRPCNotCapable
  172. }
  173. md, ok := metadata.FromIncomingContext(ss.Context())
  174. if ok {
  175. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  176. if s.Leader() == types.ID(raft.None) {
  177. return rpctypes.ErrGRPCNoLeader
  178. }
  179. cctx, cancel := context.WithCancel(ss.Context())
  180. ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
  181. smap.mu.Lock()
  182. smap.streams[ss] = struct{}{}
  183. smap.mu.Unlock()
  184. defer func() {
  185. smap.mu.Lock()
  186. delete(smap.streams, ss)
  187. smap.mu.Unlock()
  188. cancel()
  189. }()
  190. }
  191. }
  192. err := handler(srv, ss)
  193. return err
  194. }
  195. }
  196. type serverStreamWithCtx struct {
  197. grpc.ServerStream
  198. ctx context.Context
  199. cancel *context.CancelFunc
  200. }
  201. func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
  202. func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
  203. smap := &streamsMap{
  204. streams: make(map[grpc.ServerStream]struct{}),
  205. }
  206. go func() {
  207. election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
  208. noLeaderCnt := 0
  209. for {
  210. select {
  211. case <-s.StopNotify():
  212. return
  213. case <-time.After(election):
  214. if s.Leader() == types.ID(raft.None) {
  215. noLeaderCnt++
  216. } else {
  217. noLeaderCnt = 0
  218. }
  219. // We are more conservative on canceling existing streams. Reconnecting streams
  220. // cost much more than just rejecting new requests. So we wait until the member
  221. // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
  222. if noLeaderCnt >= maxNoLeaderCnt {
  223. smap.mu.Lock()
  224. for ss := range smap.streams {
  225. if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
  226. (*ssWithCtx.cancel)()
  227. <-ss.Context().Done()
  228. }
  229. }
  230. smap.streams = make(map[grpc.ServerStream]struct{})
  231. smap.mu.Unlock()
  232. }
  233. }
  234. }
  235. }()
  236. return smap
  237. }