peer.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package etcd
  2. import (
  3. "bytes"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "sync"
  8. "sync/atomic"
  9. )
  10. const (
  11. maxInflight = 4
  12. )
  13. const (
  14. // participant is defined in etcd.go
  15. idle = iota + 1
  16. stopped
  17. )
  18. type peer struct {
  19. url string
  20. queue chan []byte
  21. status int
  22. inflight atomicInt
  23. c *http.Client
  24. mu sync.RWMutex
  25. wg sync.WaitGroup
  26. }
  27. func newPeer(url string, c *http.Client) *peer {
  28. return &peer{
  29. url: url,
  30. status: idle,
  31. c: c,
  32. }
  33. }
  34. func (p *peer) participate() {
  35. p.mu.Lock()
  36. defer p.mu.Unlock()
  37. p.queue = make(chan []byte)
  38. p.status = participant
  39. for i := 0; i < maxInflight; i++ {
  40. p.wg.Add(1)
  41. go p.handle(p.queue)
  42. }
  43. }
  44. func (p *peer) idle() {
  45. p.mu.Lock()
  46. defer p.mu.Unlock()
  47. if p.status == participant {
  48. close(p.queue)
  49. }
  50. p.status = idle
  51. }
  52. func (p *peer) stop() {
  53. p.mu.Lock()
  54. if p.status == participant {
  55. close(p.queue)
  56. }
  57. p.status = stopped
  58. p.mu.Unlock()
  59. p.wg.Wait()
  60. }
  61. func (p *peer) handle(queue chan []byte) {
  62. defer p.wg.Done()
  63. for d := range queue {
  64. p.post(d)
  65. }
  66. }
  67. func (p *peer) send(d []byte) error {
  68. p.mu.Lock()
  69. defer p.mu.Unlock()
  70. switch p.status {
  71. case participant:
  72. select {
  73. case p.queue <- d:
  74. default:
  75. return fmt.Errorf("reach max serving")
  76. }
  77. case idle:
  78. if p.inflight.Get() > maxInflight {
  79. return fmt.Errorf("reach max idle")
  80. }
  81. go func() {
  82. p.wg.Add(1)
  83. p.post(d)
  84. p.wg.Done()
  85. }()
  86. case stopped:
  87. return fmt.Errorf("sender stopped")
  88. }
  89. return nil
  90. }
  91. func (p *peer) post(d []byte) {
  92. p.inflight.Add(1)
  93. defer p.inflight.Add(-1)
  94. buf := bytes.NewBuffer(d)
  95. resp, err := p.c.Post(p.url, "application/octet-stream", buf)
  96. if err != nil {
  97. log.Println("post:", err)
  98. return
  99. }
  100. resp.Body.Close()
  101. }
  102. // An AtomicInt is an int64 to be accessed atomically.
  103. type atomicInt int64
  104. func (i *atomicInt) Add(d int64) {
  105. atomic.AddInt64((*int64)(i), d)
  106. }
  107. func (i *atomicInt) Get() int64 {
  108. return atomic.LoadInt64((*int64)(i))
  109. }