host_entry.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package hostpool
  2. import (
  3. "math"
  4. "time"
  5. )
  6. type HostEntry interface {
  7. IsDead() bool
  8. Host() string
  9. SetDead(bool)
  10. canTryHost(time.Time) bool
  11. willRetryHost()
  12. }
  13. // -- Requests
  14. type hostEntryRequest interface {
  15. getRespChan() chan<- interface{}
  16. }
  17. type baseHostEntryRequest struct {
  18. respChan chan interface{}
  19. }
  20. func (req *baseHostEntryRequest) getRespChan() chan<- interface{} {
  21. return req.respChan
  22. }
  23. type isDeadRequest struct{ baseHostEntryRequest }
  24. type setDeadRequest struct {
  25. baseHostEntryRequest
  26. setDeadTo bool
  27. }
  28. type canTryRequest struct {
  29. baseHostEntryRequest
  30. atTime time.Time
  31. }
  32. type willRetryRequest struct{ baseHostEntryRequest }
  33. type hostEntry struct {
  34. host string
  35. nextRetry time.Time
  36. retryDelay time.Duration
  37. initialRetryDelay time.Duration
  38. maxRetryInterval time.Duration
  39. dead bool
  40. incomingRequests chan hostEntryRequest
  41. }
  42. func (he *hostEntry) Host() string {
  43. // This never changes, so we can safely return it
  44. return he.host
  45. }
  46. func newHostEntry(host string, initialRetryDelay time.Duration, maxRetryInterval time.Duration) HostEntry {
  47. he := &hostEntry{
  48. host: host,
  49. retryDelay: initialRetryDelay,
  50. initialRetryDelay: initialRetryDelay,
  51. maxRetryInterval: maxRetryInterval,
  52. incomingRequests: make(chan hostEntryRequest),
  53. }
  54. go he.handleRequests()
  55. return he
  56. }
  57. func (he *hostEntry) handleRequests() {
  58. for req := range he.incomingRequests {
  59. var resp interface{}
  60. switch req.(type) {
  61. case *isDeadRequest:
  62. resp = he.dead
  63. case *setDeadRequest:
  64. newVal := req.(*setDeadRequest).setDeadTo
  65. if newVal && !he.dead {
  66. // Entering the deadpool - initialize retry
  67. he.retryDelay = he.initialRetryDelay
  68. he.nextRetry = time.Now().Add(he.retryDelay)
  69. }
  70. he.dead = newVal
  71. case *canTryRequest:
  72. resp = !he.dead || he.nextRetry.Before(req.(*canTryRequest).atTime)
  73. case *willRetryRequest:
  74. he.retryDelay = time.Duration(int64(math.Min(float64(he.retryDelay*2), float64(he.maxRetryInterval))))
  75. he.nextRetry = time.Now().Add(he.retryDelay)
  76. }
  77. req.getRespChan() <- resp
  78. }
  79. }
  80. func (he *hostEntry) IsDead() bool {
  81. req := &isDeadRequest{
  82. baseHostEntryRequest{
  83. respChan: make(chan interface{}),
  84. },
  85. }
  86. he.incomingRequests <- req
  87. resp := <-req.respChan
  88. isDeadResp, ok := resp.(bool)
  89. if !ok {
  90. // TODO
  91. }
  92. return isDeadResp
  93. }
  94. func (he *hostEntry) SetDead(newDeadVal bool) {
  95. req := &setDeadRequest{
  96. baseHostEntryRequest{
  97. respChan: make(chan interface{}),
  98. },
  99. newDeadVal,
  100. }
  101. he.incomingRequests <- req
  102. <-req.respChan
  103. }
  104. func (he *hostEntry) canTryHost(now time.Time) bool {
  105. req := &canTryRequest{
  106. baseHostEntryRequest{
  107. respChan: make(chan interface{}),
  108. },
  109. now,
  110. }
  111. he.incomingRequests <- req
  112. resp := <-req.respChan
  113. canTryResp, ok := resp.(bool)
  114. if !ok {
  115. // TODO
  116. }
  117. return canTryResp
  118. }
  119. func (he *hostEntry) willRetryHost() {
  120. req := &willRetryRequest{
  121. baseHostEntryRequest{
  122. respChan: make(chan interface{}),
  123. },
  124. }
  125. he.incomingRequests <- req
  126. <-req.respChan
  127. }