sheddinginterceptor.go 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package serverinterceptors
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/load"
  6. "github.com/tal-tech/go-zero/core/stat"
  7. "google.golang.org/grpc"
  8. )
  9. const serviceType = "rpc"
  10. var (
  11. sheddingStat *load.SheddingStat
  12. lock sync.Mutex
  13. )
  14. func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor {
  15. ensureSheddingStat()
  16. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
  17. handler grpc.UnaryHandler) (val interface{}, err error) {
  18. sheddingStat.IncrementTotal()
  19. var promise load.Promise
  20. promise, err = shedder.Allow()
  21. if err != nil {
  22. metrics.AddDrop()
  23. sheddingStat.IncrementDrop()
  24. return
  25. }
  26. defer func() {
  27. if err == context.DeadlineExceeded {
  28. promise.Fail()
  29. } else {
  30. sheddingStat.IncrementPass()
  31. promise.Pass()
  32. }
  33. }()
  34. return handler(ctx, req)
  35. }
  36. }
  37. func ensureSheddingStat() {
  38. lock.Lock()
  39. if sheddingStat == nil {
  40. sheddingStat = load.NewSheddingStat(serviceType)
  41. }
  42. lock.Unlock()
  43. }