host_entry.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package hostpool
  2. import (
  3. "log"
  4. "math"
  5. "time"
  6. )
  7. type HostEntry interface {
  8. IsDead() bool
  9. Host() string
  10. SetDead(bool)
  11. canTryHost(time.Time) bool
  12. willRetryHost()
  13. Close()
  14. }
  15. // -- Requests
  16. type hostEntryRequest interface {
  17. getRespChan() chan<- interface{}
  18. }
  19. type baseHostEntryRequest struct {
  20. respChan chan interface{}
  21. }
  22. func (req *baseHostEntryRequest) getRespChan() chan<- interface{} {
  23. return req.respChan
  24. }
  25. type isDeadRequest struct{ baseHostEntryRequest }
  26. type setDeadRequest struct {
  27. baseHostEntryRequest
  28. setDeadTo bool
  29. }
  30. type canTryRequest struct {
  31. baseHostEntryRequest
  32. atTime time.Time
  33. }
  34. type willRetryRequest struct{ baseHostEntryRequest }
  35. type hostEntry struct {
  36. host string
  37. nextRetry time.Time
  38. retryDelay time.Duration
  39. initialRetryDelay time.Duration
  40. maxRetryInterval time.Duration
  41. dead bool
  42. incomingRequests chan hostEntryRequest
  43. }
  44. func (he *hostEntry) Host() string {
  45. // This never changes, so we can safely return it
  46. return he.host
  47. }
  48. func newHostEntry(host string, initialRetryDelay time.Duration, maxRetryInterval time.Duration) HostEntry {
  49. he := &hostEntry{
  50. host: host,
  51. retryDelay: initialRetryDelay,
  52. initialRetryDelay: initialRetryDelay,
  53. maxRetryInterval: maxRetryInterval,
  54. incomingRequests: make(chan hostEntryRequest),
  55. }
  56. go he.handleRequests()
  57. return he
  58. }
  59. func (he *hostEntry) handleRequests() {
  60. for req := range he.incomingRequests {
  61. var resp interface{}
  62. switch req.(type) {
  63. case *isDeadRequest:
  64. resp = he.dead
  65. case *setDeadRequest:
  66. newVal := req.(*setDeadRequest).setDeadTo
  67. if newVal && !he.dead {
  68. // Entering the deadpool - initialize retry
  69. he.retryDelay = he.initialRetryDelay
  70. he.nextRetry = time.Now().Add(he.retryDelay)
  71. }
  72. he.dead = newVal
  73. case *canTryRequest:
  74. resp = !he.dead || he.nextRetry.Before(req.(*canTryRequest).atTime)
  75. case *willRetryRequest:
  76. he.retryDelay = time.Duration(int64(math.Min(float64(he.retryDelay*2), float64(he.maxRetryInterval))))
  77. he.nextRetry = time.Now().Add(he.retryDelay)
  78. }
  79. req.getRespChan() <- resp
  80. }
  81. }
  82. func (he *hostEntry) IsDead() bool {
  83. req := &isDeadRequest{
  84. baseHostEntryRequest{
  85. respChan: make(chan interface{}),
  86. },
  87. }
  88. he.incomingRequests <- req
  89. resp := <-req.respChan
  90. isDeadResp, ok := resp.(bool)
  91. if !ok {
  92. log.Fatal("Got incorrect response type from host_entry muxer in IsDead")
  93. }
  94. return isDeadResp
  95. }
  96. func (he *hostEntry) SetDead(newDeadVal bool) {
  97. req := &setDeadRequest{
  98. baseHostEntryRequest{
  99. respChan: make(chan interface{}),
  100. },
  101. newDeadVal,
  102. }
  103. he.incomingRequests <- req
  104. <-req.respChan
  105. }
  106. func (he *hostEntry) canTryHost(now time.Time) bool {
  107. req := &canTryRequest{
  108. baseHostEntryRequest{
  109. respChan: make(chan interface{}),
  110. },
  111. now,
  112. }
  113. he.incomingRequests <- req
  114. resp := <-req.respChan
  115. canTryResp, ok := resp.(bool)
  116. if !ok {
  117. log.Fatal("Got incorrect response type from host_entry muxer in canTryHost")
  118. }
  119. return canTryResp
  120. }
  121. func (he *hostEntry) willRetryHost() {
  122. req := &willRetryRequest{
  123. baseHostEntryRequest{
  124. respChan: make(chan interface{}),
  125. },
  126. }
  127. he.incomingRequests <- req
  128. <-req.respChan
  129. }
  130. func (he *hostEntry) Close() {
  131. close(he.incomingRequests)
  132. }