sheddinghandler.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package handler
  2. import (
  3. "net/http"
  4. "sync"
  5. "github.com/tal-tech/go-zero/core/load"
  6. "github.com/tal-tech/go-zero/core/logx"
  7. "github.com/tal-tech/go-zero/core/stat"
  8. "github.com/tal-tech/go-zero/rest/httpx"
  9. "github.com/tal-tech/go-zero/rest/internal/security"
  10. )
  11. const serviceType = "api"
  12. var (
  13. sheddingStat *load.SheddingStat
  14. lock sync.Mutex
  15. )
  16. func SheddingHandler(shedder load.Shedder, metrics *stat.Metrics) func(http.Handler) http.Handler {
  17. if shedder == nil {
  18. return func(next http.Handler) http.Handler {
  19. return next
  20. }
  21. }
  22. ensureSheddingStat()
  23. return func(next http.Handler) http.Handler {
  24. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  25. sheddingStat.IncrementTotal()
  26. promise, err := shedder.Allow()
  27. if err != nil {
  28. metrics.AddDrop()
  29. sheddingStat.IncrementDrop()
  30. logx.Errorf("[http] dropped, %s - %s - %s",
  31. r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent())
  32. w.WriteHeader(http.StatusServiceUnavailable)
  33. return
  34. }
  35. cw := &security.WithCodeResponseWriter{Writer: w}
  36. defer func() {
  37. if cw.Code == http.StatusServiceUnavailable {
  38. promise.Fail()
  39. } else {
  40. sheddingStat.IncrementPass()
  41. promise.Pass()
  42. }
  43. }()
  44. next.ServeHTTP(cw, r)
  45. })
  46. }
  47. }
  48. func ensureSheddingStat() {
  49. lock.Lock()
  50. if sheddingStat == nil {
  51. sheddingStat = load.NewSheddingStat(serviceType)
  52. }
  53. lock.Unlock()
  54. }