peer.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package etcd
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "sync"
  9. "sync/atomic"
  10. )
  11. const (
  12. maxInflight = 4
  13. )
  14. const (
  15. // participant is defined in etcd.go
  16. idle = iota + 1
  17. stopped
  18. )
  19. var (
  20. errUnknownNode = errors.New("unknown node")
  21. )
  22. type peer struct {
  23. url string
  24. queue chan []byte
  25. status int
  26. inflight atomicInt
  27. c *http.Client
  28. mu sync.RWMutex
  29. wg sync.WaitGroup
  30. }
  31. func newPeer(url string, c *http.Client) *peer {
  32. return &peer{
  33. url: url,
  34. status: idle,
  35. c: c,
  36. }
  37. }
  38. func (p *peer) participate() {
  39. p.mu.Lock()
  40. defer p.mu.Unlock()
  41. p.queue = make(chan []byte)
  42. p.status = participant
  43. for i := 0; i < maxInflight; i++ {
  44. p.wg.Add(1)
  45. go p.handle(p.queue)
  46. }
  47. }
  48. func (p *peer) idle() {
  49. p.mu.Lock()
  50. defer p.mu.Unlock()
  51. if p.status == participant {
  52. close(p.queue)
  53. }
  54. p.status = idle
  55. }
  56. func (p *peer) stop() {
  57. p.mu.Lock()
  58. if p.status == participant {
  59. close(p.queue)
  60. }
  61. p.status = stopped
  62. p.mu.Unlock()
  63. p.wg.Wait()
  64. }
  65. func (p *peer) handle(queue chan []byte) {
  66. defer p.wg.Done()
  67. for d := range queue {
  68. p.post(d)
  69. }
  70. }
  71. func (p *peer) send(d []byte) error {
  72. p.mu.Lock()
  73. defer p.mu.Unlock()
  74. switch p.status {
  75. case participant:
  76. select {
  77. case p.queue <- d:
  78. default:
  79. return fmt.Errorf("reach max serving")
  80. }
  81. case idle:
  82. if p.inflight.Get() > maxInflight {
  83. return fmt.Errorf("reach max idle")
  84. }
  85. go func() {
  86. p.wg.Add(1)
  87. p.post(d)
  88. p.wg.Done()
  89. }()
  90. case stopped:
  91. return fmt.Errorf("sender stopped")
  92. }
  93. return nil
  94. }
  95. func (p *peer) post(d []byte) {
  96. p.inflight.Add(1)
  97. defer p.inflight.Add(-1)
  98. buf := bytes.NewBuffer(d)
  99. resp, err := p.c.Post(p.url, "application/octet-stream", buf)
  100. if err != nil {
  101. log.Println("post:", err)
  102. return
  103. }
  104. resp.Body.Close()
  105. }
  106. // An AtomicInt is an int64 to be accessed atomically.
  107. type atomicInt int64
  108. func (i *atomicInt) Add(d int64) {
  109. atomic.AddInt64((*int64)(i), d)
  110. }
  111. func (i *atomicInt) Get() int64 {
  112. return atomic.LoadInt64((*int64)(i))
  113. }