host_entry.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package hostpool
  2. import (
  3. "log"
  4. "math"
  5. "time"
  6. )
  7. type HostEntry interface {
  8. IsDead() bool
  9. Host() string
  10. SetDead(bool)
  11. canTryHost(time.Time) bool
  12. willRetryHost()
  13. }
  14. // -- Requests
  15. type hostEntryRequest interface {
  16. getRespChan() chan<- interface{}
  17. }
  18. type baseHostEntryRequest struct {
  19. respChan chan interface{}
  20. }
  21. func (req *baseHostEntryRequest) getRespChan() chan<- interface{} {
  22. return req.respChan
  23. }
  24. type isDeadRequest struct{ baseHostEntryRequest }
  25. type setDeadRequest struct {
  26. baseHostEntryRequest
  27. setDeadTo bool
  28. }
  29. type canTryRequest struct {
  30. baseHostEntryRequest
  31. atTime time.Time
  32. }
  33. type willRetryRequest struct{ baseHostEntryRequest }
  34. type hostEntry struct {
  35. host string
  36. nextRetry time.Time
  37. retryDelay time.Duration
  38. initialRetryDelay time.Duration
  39. maxRetryInterval time.Duration
  40. dead bool
  41. incomingRequests chan hostEntryRequest
  42. }
  43. func (he *hostEntry) Host() string {
  44. // This never changes, so we can safely return it
  45. return he.host
  46. }
  47. func newHostEntry(host string, initialRetryDelay time.Duration, maxRetryInterval time.Duration) HostEntry {
  48. he := &hostEntry{
  49. host: host,
  50. retryDelay: initialRetryDelay,
  51. initialRetryDelay: initialRetryDelay,
  52. maxRetryInterval: maxRetryInterval,
  53. incomingRequests: make(chan hostEntryRequest),
  54. }
  55. go he.handleRequests()
  56. return he
  57. }
  58. func (he *hostEntry) handleRequests() {
  59. for req := range he.incomingRequests {
  60. var resp interface{}
  61. switch req.(type) {
  62. case *isDeadRequest:
  63. resp = he.dead
  64. case *setDeadRequest:
  65. newVal := req.(*setDeadRequest).setDeadTo
  66. if newVal && !he.dead {
  67. // Entering the deadpool - initialize retry
  68. he.retryDelay = he.initialRetryDelay
  69. he.nextRetry = time.Now().Add(he.retryDelay)
  70. }
  71. he.dead = newVal
  72. case *canTryRequest:
  73. resp = !he.dead || he.nextRetry.Before(req.(*canTryRequest).atTime)
  74. case *willRetryRequest:
  75. he.retryDelay = time.Duration(int64(math.Min(float64(he.retryDelay*2), float64(he.maxRetryInterval))))
  76. he.nextRetry = time.Now().Add(he.retryDelay)
  77. }
  78. req.getRespChan() <- resp
  79. }
  80. }
  81. func (he *hostEntry) IsDead() bool {
  82. req := &isDeadRequest{
  83. baseHostEntryRequest{
  84. respChan: make(chan interface{}),
  85. },
  86. }
  87. he.incomingRequests <- req
  88. resp := <-req.respChan
  89. isDeadResp, ok := resp.(bool)
  90. if !ok {
  91. log.Fatal("Got incorrect response type from host_entry muxer in IsDead")
  92. }
  93. return isDeadResp
  94. }
  95. func (he *hostEntry) SetDead(newDeadVal bool) {
  96. req := &setDeadRequest{
  97. baseHostEntryRequest{
  98. respChan: make(chan interface{}),
  99. },
  100. newDeadVal,
  101. }
  102. he.incomingRequests <- req
  103. <-req.respChan
  104. }
  105. func (he *hostEntry) canTryHost(now time.Time) bool {
  106. req := &canTryRequest{
  107. baseHostEntryRequest{
  108. respChan: make(chan interface{}),
  109. },
  110. now,
  111. }
  112. he.incomingRequests <- req
  113. resp := <-req.respChan
  114. canTryResp, ok := resp.(bool)
  115. if !ok {
  116. log.Fatal("Got incorrect response type from host_entry muxer in canTryHost")
  117. }
  118. return canTryResp
  119. }
  120. func (he *hostEntry) willRetryHost() {
  121. req := &willRetryRequest{
  122. baseHostEntryRequest{
  123. respChan: make(chan interface{}),
  124. },
  125. }
  126. he.incomingRequests <- req
  127. <-req.respChan
  128. }