prober.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package probing
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "sync"
  8. "time"
  9. )
  10. var (
  11. ErrNotFound = errors.New("probing: id not found")
  12. ErrExist = errors.New("probing: id exists")
  13. )
  14. type Prober interface {
  15. AddHTTP(id string, probingInterval time.Duration, endpoints []string) error
  16. Remove(id string) error
  17. RemoveAll()
  18. Reset(id string) error
  19. Status(id string) (Status, error)
  20. }
  21. type prober struct {
  22. mu sync.Mutex
  23. targets map[string]*status
  24. tr http.RoundTripper
  25. }
  26. func NewProber(tr http.RoundTripper) Prober {
  27. p := &prober{targets: make(map[string]*status)}
  28. if tr == nil {
  29. p.tr = http.DefaultTransport
  30. } else {
  31. p.tr = tr
  32. }
  33. return p
  34. }
  35. func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error {
  36. p.mu.Lock()
  37. defer p.mu.Unlock()
  38. if _, ok := p.targets[id]; ok {
  39. return ErrExist
  40. }
  41. s := &status{stopC: make(chan struct{})}
  42. p.targets[id] = s
  43. ticker := time.NewTicker(probingInterval)
  44. go func() {
  45. pinned := 0
  46. for {
  47. select {
  48. case <-ticker.C:
  49. start := time.Now()
  50. req, err := http.NewRequest("GET", endpoints[pinned], nil)
  51. if err != nil {
  52. panic(err)
  53. }
  54. resp, err := p.tr.RoundTrip(req)
  55. if err == nil && resp.StatusCode != http.StatusOK {
  56. err = fmt.Errorf("got unexpected HTTP status code %s from %s", resp.Status, endpoints[pinned])
  57. resp.Body.Close()
  58. }
  59. if err != nil {
  60. s.recordFailure(err)
  61. pinned = (pinned + 1) % len(endpoints)
  62. continue
  63. }
  64. var hh Health
  65. d := json.NewDecoder(resp.Body)
  66. err = d.Decode(&hh)
  67. resp.Body.Close()
  68. if err != nil || !hh.OK {
  69. s.recordFailure(err)
  70. pinned = (pinned + 1) % len(endpoints)
  71. continue
  72. }
  73. s.record(time.Since(start), hh.Now)
  74. case <-s.stopC:
  75. ticker.Stop()
  76. return
  77. }
  78. }
  79. }()
  80. return nil
  81. }
  82. func (p *prober) Remove(id string) error {
  83. p.mu.Lock()
  84. defer p.mu.Unlock()
  85. s, ok := p.targets[id]
  86. if !ok {
  87. return ErrNotFound
  88. }
  89. close(s.stopC)
  90. delete(p.targets, id)
  91. return nil
  92. }
  93. func (p *prober) RemoveAll() {
  94. p.mu.Lock()
  95. defer p.mu.Unlock()
  96. for _, s := range p.targets {
  97. close(s.stopC)
  98. }
  99. p.targets = make(map[string]*status)
  100. }
  101. func (p *prober) Reset(id string) error {
  102. p.mu.Lock()
  103. defer p.mu.Unlock()
  104. s, ok := p.targets[id]
  105. if !ok {
  106. return ErrNotFound
  107. }
  108. s.reset()
  109. return nil
  110. }
  111. func (p *prober) Status(id string) (Status, error) {
  112. p.mu.Lock()
  113. defer p.mu.Unlock()
  114. s, ok := p.targets[id]
  115. if !ok {
  116. return nil, ErrNotFound
  117. }
  118. return s, nil
  119. }