interceptor.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package v3rpc
  15. import (
  16. "context"
  17. "sync"
  18. "time"
  19. "go.etcd.io/etcd/v3/etcdserver"
  20. "go.etcd.io/etcd/v3/etcdserver/api"
  21. "go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
  22. "go.etcd.io/etcd/v3/pkg/types"
  23. "go.etcd.io/etcd/v3/raft"
  24. "github.com/coreos/pkg/capnslog"
  25. pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
  26. "go.uber.org/zap"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/metadata"
  29. "google.golang.org/grpc/peer"
  30. )
  31. const (
  32. maxNoLeaderCnt = 3
  33. )
  34. type streamsMap struct {
  35. mu sync.Mutex
  36. streams map[grpc.ServerStream]struct{}
  37. }
  38. func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
  39. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  40. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  41. return nil, rpctypes.ErrGRPCNotCapable
  42. }
  43. // TODO: add test in clientv3/integration to verify behavior
  44. if s.IsLearner() && !isRPCEnabledForLearner(req) {
  45. return nil, rpctypes.ErrGPRCNotSupportedForLearner
  46. }
  47. md, ok := metadata.FromIncomingContext(ctx)
  48. if ok {
  49. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  50. if s.Leader() == types.ID(raft.None) {
  51. return nil, rpctypes.ErrGRPCNoLeader
  52. }
  53. }
  54. }
  55. return handler(ctx, req)
  56. }
  57. }
  58. func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
  59. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  60. startTime := time.Now()
  61. resp, err := handler(ctx, req)
  62. lg := s.Logger()
  63. if (lg != nil && lg.Core().Enabled(zap.DebugLevel)) || // using zap logger and debug level is enabled
  64. (lg == nil && plog.LevelAt(capnslog.DEBUG)) { // or, using capnslog and debug level is enabled
  65. defer logUnaryRequestStats(ctx, lg, info, startTime, req, resp)
  66. }
  67. return resp, err
  68. }
  69. }
  70. func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
  71. duration := time.Since(startTime)
  72. remote := "No remote client info."
  73. peerInfo, ok := peer.FromContext(ctx)
  74. if ok {
  75. remote = peerInfo.Addr.String()
  76. }
  77. responseType := info.FullMethod
  78. var reqCount, respCount int64
  79. var reqSize, respSize int
  80. var reqContent string
  81. switch _resp := resp.(type) {
  82. case *pb.RangeResponse:
  83. _req, ok := req.(*pb.RangeRequest)
  84. if ok {
  85. reqCount = 0
  86. reqSize = _req.Size()
  87. reqContent = _req.String()
  88. }
  89. if _resp != nil {
  90. respCount = _resp.GetCount()
  91. respSize = _resp.Size()
  92. }
  93. case *pb.PutResponse:
  94. _req, ok := req.(*pb.PutRequest)
  95. if ok {
  96. reqCount = 1
  97. reqSize = _req.Size()
  98. reqContent = pb.NewLoggablePutRequest(_req).String()
  99. // redact value field from request content, see PR #9821
  100. }
  101. if _resp != nil {
  102. respCount = 0
  103. respSize = _resp.Size()
  104. }
  105. case *pb.DeleteRangeResponse:
  106. _req, ok := req.(*pb.DeleteRangeRequest)
  107. if ok {
  108. reqCount = 0
  109. reqSize = _req.Size()
  110. reqContent = _req.String()
  111. }
  112. if _resp != nil {
  113. respCount = _resp.GetDeleted()
  114. respSize = _resp.Size()
  115. }
  116. case *pb.TxnResponse:
  117. _req, ok := req.(*pb.TxnRequest)
  118. if ok && _resp != nil {
  119. if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
  120. reqCount = int64(len(_req.GetSuccess()))
  121. reqSize = 0
  122. for _, r := range _req.GetSuccess() {
  123. reqSize += r.Size()
  124. }
  125. } else {
  126. reqCount = int64(len(_req.GetFailure()))
  127. reqSize = 0
  128. for _, r := range _req.GetFailure() {
  129. reqSize += r.Size()
  130. }
  131. }
  132. reqContent = pb.NewLoggableTxnRequest(_req).String()
  133. // redact value field from request content, see PR #9821
  134. }
  135. if _resp != nil {
  136. respCount = 0
  137. respSize = _resp.Size()
  138. }
  139. default:
  140. reqCount = -1
  141. reqSize = -1
  142. respCount = -1
  143. respSize = -1
  144. }
  145. logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
  146. }
  147. func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
  148. reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
  149. if lg == nil {
  150. plog.Debugf("start time = %v, "+
  151. "time spent = %v, "+
  152. "remote = %s, "+
  153. "response type = %s, "+
  154. "request count = %d, "+
  155. "request size = %d, "+
  156. "response count = %d, "+
  157. "response size = %d, "+
  158. "request content = %s",
  159. startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
  160. )
  161. } else {
  162. lg.Debug("request stats",
  163. zap.Time("start time", startTime),
  164. zap.Duration("time spent", duration),
  165. zap.String("remote", remote),
  166. zap.String("response type", responseType),
  167. zap.Int64("request count", reqCount),
  168. zap.Int("request size", reqSize),
  169. zap.Int64("response count", respCount),
  170. zap.Int("response size", respSize),
  171. zap.String("request content", reqContent),
  172. )
  173. }
  174. }
  175. func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
  176. smap := monitorLeader(s)
  177. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  178. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  179. return rpctypes.ErrGRPCNotCapable
  180. }
  181. if s.IsLearner() { // learner does not support Watch and LeaseKeepAlive RPC
  182. return rpctypes.ErrGPRCNotSupportedForLearner
  183. }
  184. md, ok := metadata.FromIncomingContext(ss.Context())
  185. if ok {
  186. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  187. if s.Leader() == types.ID(raft.None) {
  188. return rpctypes.ErrGRPCNoLeader
  189. }
  190. cctx, cancel := context.WithCancel(ss.Context())
  191. ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
  192. smap.mu.Lock()
  193. smap.streams[ss] = struct{}{}
  194. smap.mu.Unlock()
  195. defer func() {
  196. smap.mu.Lock()
  197. delete(smap.streams, ss)
  198. smap.mu.Unlock()
  199. cancel()
  200. }()
  201. }
  202. }
  203. return handler(srv, ss)
  204. }
  205. }
  206. type serverStreamWithCtx struct {
  207. grpc.ServerStream
  208. ctx context.Context
  209. cancel *context.CancelFunc
  210. }
  211. func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
  212. func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
  213. smap := &streamsMap{
  214. streams: make(map[grpc.ServerStream]struct{}),
  215. }
  216. go func() {
  217. election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
  218. noLeaderCnt := 0
  219. for {
  220. select {
  221. case <-s.StopNotify():
  222. return
  223. case <-time.After(election):
  224. if s.Leader() == types.ID(raft.None) {
  225. noLeaderCnt++
  226. } else {
  227. noLeaderCnt = 0
  228. }
  229. // We are more conservative on canceling existing streams. Reconnecting streams
  230. // cost much more than just rejecting new requests. So we wait until the member
  231. // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
  232. if noLeaderCnt >= maxNoLeaderCnt {
  233. smap.mu.Lock()
  234. for ss := range smap.streams {
  235. if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
  236. (*ssWithCtx.cancel)()
  237. <-ss.Context().Done()
  238. }
  239. }
  240. smap.streams = make(map[grpc.ServerStream]struct{})
  241. smap.mu.Unlock()
  242. }
  243. }
  244. }
  245. }()
  246. return smap
  247. }