host_entry.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package hostpool
  2. import (
  3. "math"
  4. "time"
  5. )
  6. type HostEntry interface {
  7. IsDead() bool
  8. Host() string
  9. SetDead(bool)
  10. canTryHost(time.Time) bool
  11. willRetryHost()
  12. }
  13. // -- Requests
  14. type hostEntryRequest interface {
  15. getRespChan() chan<- interface{}
  16. }
  17. type baseHostEntryRequest struct {
  18. respChan chan interface{}
  19. }
  20. func (req *baseHostEntryRequest) getRespChan() chan<- interface{} {
  21. return req.respChan
  22. }
  23. type isDeadRequest struct{ baseHostEntryRequest }
  24. type setDeadRequest struct {
  25. baseHostEntryRequest
  26. setDeadTo bool
  27. }
  28. type canTryRequest struct {
  29. baseHostEntryRequest
  30. atTime time.Time
  31. }
  32. type willRetryRequest struct{ baseHostEntryRequest }
  33. type hostEntry struct {
  34. host string
  35. nextRetry time.Time
  36. retryDelay time.Duration
  37. initialRetryDelay time.Duration
  38. maxRetryInterval time.Duration
  39. dead bool
  40. // epsilonCounts []int64
  41. // epsilonValues []int64
  42. // epsilonIndex int
  43. // epsilonValue float64
  44. // epsilonPercentage float64
  45. incomingRequests chan hostEntryRequest
  46. }
  47. func (he *hostEntry) Host() string {
  48. // This never changes, so we can safely return it
  49. return he.host
  50. }
  51. func newHostEntry(host string, initialRetryDelay time.Duration, maxRetryInterval time.Duration) HostEntry {
  52. he := &hostEntry{
  53. host: host,
  54. retryDelay: initialRetryDelay,
  55. initialRetryDelay: initialRetryDelay,
  56. maxRetryInterval: maxRetryInterval,
  57. incomingRequests: make(chan hostEntryRequest),
  58. }
  59. go he.handleRequests()
  60. return he
  61. }
  62. func (he *hostEntry) handleRequests() {
  63. for req := range he.incomingRequests {
  64. var resp interface{}
  65. switch req.(type) {
  66. case *isDeadRequest:
  67. resp = he.dead
  68. case *setDeadRequest:
  69. newVal := req.(*setDeadRequest).setDeadTo
  70. if newVal && !he.dead {
  71. // Entering the deadpool - initialize retry
  72. he.retryDelay = he.initialRetryDelay
  73. he.nextRetry = time.Now().Add(he.retryDelay)
  74. }
  75. he.dead = newVal
  76. case *canTryRequest:
  77. resp = !he.dead || he.nextRetry.Before(req.(*canTryRequest).atTime)
  78. case *willRetryRequest:
  79. he.retryDelay = time.Duration(int64(math.Min(float64(he.retryDelay*2), float64(he.maxRetryInterval))))
  80. he.nextRetry = time.Now().Add(he.retryDelay)
  81. }
  82. req.getRespChan() <- resp
  83. }
  84. }
  85. func (he *hostEntry) IsDead() bool {
  86. req := &isDeadRequest{
  87. baseHostEntryRequest{
  88. respChan: make(chan interface{}),
  89. },
  90. }
  91. he.incomingRequests <- req
  92. resp := <-req.respChan
  93. isDeadResp, ok := resp.(bool)
  94. if !ok {
  95. // TODO
  96. }
  97. return isDeadResp
  98. }
  99. func (he *hostEntry) SetDead(newDeadVal bool) {
  100. req := &setDeadRequest{
  101. baseHostEntryRequest{
  102. respChan: make(chan interface{}),
  103. },
  104. newDeadVal,
  105. }
  106. he.incomingRequests <- req
  107. <-req.respChan
  108. }
  109. func (he *hostEntry) canTryHost(now time.Time) bool {
  110. req := &canTryRequest{
  111. baseHostEntryRequest{
  112. respChan: make(chan interface{}),
  113. },
  114. now,
  115. }
  116. he.incomingRequests <- req
  117. resp := <-req.respChan
  118. canTryResp, ok := resp.(bool)
  119. if !ok {
  120. // TODO
  121. }
  122. return canTryResp
  123. }
  124. func (he *hostEntry) willRetryHost() {
  125. req := &willRetryRequest{
  126. baseHostEntryRequest{
  127. respChan: make(chan interface{}),
  128. },
  129. }
  130. he.incomingRequests <- req
  131. <-req.respChan
  132. }