selector.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package hostpool
  2. import (
  3. "log"
  4. "sync"
  5. "time"
  6. )
  7. type Selector interface {
  8. Init([]string)
  9. SelectNextHost() string
  10. MakeHostResponse(string) HostPoolResponse
  11. MarkHost(string, error)
  12. ResetAll()
  13. }
  14. type standardSelector struct {
  15. sync.RWMutex
  16. hosts map[string]*hostEntry
  17. hostList []*hostEntry
  18. initialRetryDelay time.Duration
  19. maxRetryInterval time.Duration
  20. nextHostIndex int
  21. }
  22. func (s *standardSelector) Init(hosts []string) {
  23. s.hosts = make(map[string]*hostEntry, len(hosts))
  24. s.hostList = make([]*hostEntry, len(hosts))
  25. s.initialRetryDelay = time.Duration(30) * time.Second
  26. s.maxRetryInterval = time.Duration(900) * time.Second
  27. for i, h := range hosts {
  28. e := &hostEntry{
  29. host: h,
  30. retryDelay: s.initialRetryDelay,
  31. }
  32. s.hosts[h] = e
  33. s.hostList[i] = e
  34. }
  35. }
  36. func (s *standardSelector) SelectNextHost() string {
  37. s.Lock()
  38. host := s.getRoundRobin()
  39. s.Unlock()
  40. return host
  41. }
  42. func (s *standardSelector) getRoundRobin() string {
  43. now := time.Now()
  44. hostCount := len(s.hostList)
  45. for i := range s.hostList {
  46. // iterate via sequenece from where we last iterated
  47. currentIndex := (i + s.nextHostIndex) % hostCount
  48. h := s.hostList[currentIndex]
  49. if h.canTryHost(now) {
  50. s.nextHostIndex = currentIndex + 1
  51. return h.host
  52. }
  53. }
  54. // all hosts are down. re-add them
  55. s.doResetAll()
  56. s.nextHostIndex = 0
  57. return s.hostList[0].host
  58. }
  59. func (s *standardSelector) MakeHostResponse(host string) HostPoolResponse {
  60. s.Lock()
  61. defer s.Unlock()
  62. h, ok := s.hosts[host]
  63. if !ok {
  64. log.Fatalf("host %s not in HostPool", host)
  65. }
  66. now := time.Now()
  67. if h.dead && h.nextRetry.Before(now) {
  68. h.willRetryHost(s.maxRetryInterval)
  69. }
  70. return &standardHostPoolResponse{host: host, ss: s}
  71. }
  72. func (s *standardSelector) MarkHost(host string, err error) {
  73. s.Lock()
  74. defer s.Unlock()
  75. h, ok := s.hosts[host]
  76. if !ok {
  77. log.Fatalf("host %s not in HostPool", host)
  78. }
  79. if err == nil {
  80. // success - mark host alive
  81. h.dead = false
  82. } else {
  83. // failure - mark host dead
  84. if !h.dead {
  85. h.dead = true
  86. h.retryCount = 0
  87. h.retryDelay = s.initialRetryDelay
  88. h.nextRetry = time.Now().Add(h.retryDelay)
  89. }
  90. }
  91. }
  92. func (s *standardSelector) ResetAll() {
  93. s.Lock()
  94. defer s.Unlock()
  95. s.doResetAll()
  96. }
  97. // this actually performs the logic to reset,
  98. // and should only be called when the lock has
  99. // already been acquired
  100. func (s *standardSelector) doResetAll() {
  101. for _, h := range s.hosts {
  102. h.dead = false
  103. }
  104. }