prober.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package probing
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "net/http"
  6. "sync"
  7. "time"
  8. )
  9. var (
  10. ErrNotFound = errors.New("probing: id not found")
  11. ErrExist = errors.New("probing: id exists")
  12. )
  13. type Prober interface {
  14. AddHTTP(id string, probingInterval time.Duration, endpoints []string) error
  15. Remove(id string) error
  16. RemoveAll()
  17. Reset(id string) error
  18. Status(id string) (Status, error)
  19. }
  20. type prober struct {
  21. mu sync.Mutex
  22. targets map[string]*status
  23. tr http.RoundTripper
  24. }
  25. func NewProber(tr http.RoundTripper) Prober {
  26. p := &prober{targets: make(map[string]*status)}
  27. if tr == nil {
  28. p.tr = http.DefaultTransport
  29. } else {
  30. p.tr = tr
  31. }
  32. return p
  33. }
  34. func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error {
  35. p.mu.Lock()
  36. defer p.mu.Unlock()
  37. if _, ok := p.targets[id]; ok {
  38. return ErrExist
  39. }
  40. s := &status{stopC: make(chan struct{})}
  41. p.targets[id] = s
  42. ticker := time.NewTicker(probingInterval)
  43. go func() {
  44. pinned := 0
  45. for {
  46. select {
  47. case <-ticker.C:
  48. start := time.Now()
  49. req, err := http.NewRequest("GET", endpoints[pinned], nil)
  50. if err != nil {
  51. panic(err)
  52. }
  53. resp, err := p.tr.RoundTrip(req)
  54. if err != nil {
  55. s.recordFailure(err)
  56. pinned = (pinned + 1) % len(endpoints)
  57. continue
  58. }
  59. var hh Health
  60. d := json.NewDecoder(resp.Body)
  61. err = d.Decode(&hh)
  62. resp.Body.Close()
  63. if err != nil || !hh.OK {
  64. s.recordFailure(err)
  65. pinned = (pinned + 1) % len(endpoints)
  66. continue
  67. }
  68. s.record(time.Since(start), hh.Now)
  69. case <-s.stopC:
  70. ticker.Stop()
  71. return
  72. }
  73. }
  74. }()
  75. return nil
  76. }
  77. func (p *prober) Remove(id string) error {
  78. p.mu.Lock()
  79. defer p.mu.Unlock()
  80. s, ok := p.targets[id]
  81. if !ok {
  82. return ErrNotFound
  83. }
  84. close(s.stopC)
  85. delete(p.targets, id)
  86. return nil
  87. }
  88. func (p *prober) RemoveAll() {
  89. p.mu.Lock()
  90. defer p.mu.Unlock()
  91. for _, s := range p.targets {
  92. close(s.stopC)
  93. }
  94. p.targets = make(map[string]*status)
  95. }
  96. func (p *prober) Reset(id string) error {
  97. p.mu.Lock()
  98. defer p.mu.Unlock()
  99. s, ok := p.targets[id]
  100. if !ok {
  101. return ErrNotFound
  102. }
  103. s.reset()
  104. return nil
  105. }
  106. func (p *prober) Status(id string) (Status, error) {
  107. p.mu.Lock()
  108. defer p.mu.Unlock()
  109. s, ok := p.targets[id]
  110. if !ok {
  111. return nil, ErrNotFound
  112. }
  113. return s, nil
  114. }