transporter.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package etcd
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "net/http"
  10. "net/url"
  11. "path"
  12. "sync"
  13. "github.com/coreos/etcd/raft"
  14. "github.com/coreos/etcd/store"
  15. )
  16. var (
  17. errUnknownNode = errors.New("unknown node")
  18. )
  19. type transporter struct {
  20. mu sync.RWMutex
  21. stopped bool
  22. urls map[int]string
  23. recv chan *raft.Message
  24. client *http.Client
  25. wg sync.WaitGroup
  26. }
  27. func newTransporter() *transporter {
  28. tr := new(http.Transport)
  29. c := &http.Client{Transport: tr}
  30. return &transporter{
  31. urls: make(map[int]string),
  32. recv: make(chan *raft.Message, 512),
  33. client: c,
  34. }
  35. }
  36. func (t *transporter) stop() {
  37. t.mu.Lock()
  38. t.stopped = true
  39. t.mu.Unlock()
  40. t.wg.Wait()
  41. tr := t.client.Transport.(*http.Transport)
  42. tr.CloseIdleConnections()
  43. }
  44. func (t *transporter) set(nodeId int, rawurl string) error {
  45. u, err := url.Parse(rawurl)
  46. if err != nil {
  47. return err
  48. }
  49. u.Path = raftPrefix
  50. t.mu.Lock()
  51. t.urls[nodeId] = u.String()
  52. t.mu.Unlock()
  53. return nil
  54. }
  55. func (t *transporter) sendTo(nodeId int, data []byte) error {
  56. t.mu.RLock()
  57. url := t.urls[nodeId]
  58. t.mu.RUnlock()
  59. if len(url) == 0 {
  60. return errUnknownNode
  61. }
  62. return t.send(url, data)
  63. }
  64. func (t *transporter) send(addr string, data []byte) error {
  65. t.mu.RLock()
  66. if t.stopped {
  67. t.mu.RUnlock()
  68. return fmt.Errorf("transporter stopped")
  69. }
  70. t.mu.RUnlock()
  71. buf := bytes.NewBuffer(data)
  72. t.wg.Add(1)
  73. defer t.wg.Done()
  74. resp, err := t.client.Post(addr, "application/octet-stream", buf)
  75. if err != nil {
  76. return err
  77. }
  78. resp.Body.Close()
  79. return nil
  80. }
  81. func (t *transporter) fetchAddr(seedurl string, id int) error {
  82. u, err := url.Parse(seedurl)
  83. if err != nil {
  84. return fmt.Errorf("cannot parse the url of the given seed")
  85. }
  86. u.Path = path.Join(v2Prefix, v2machineKVPrefix, fmt.Sprint(id))
  87. resp, err := t.client.Get(u.String())
  88. if err != nil {
  89. return fmt.Errorf("cannot reach %v", u)
  90. }
  91. defer resp.Body.Close()
  92. b, err := ioutil.ReadAll(resp.Body)
  93. if err != nil {
  94. return fmt.Errorf("cannot reach %v", u)
  95. }
  96. event := new(store.Event)
  97. err = json.Unmarshal(b, event)
  98. if err != nil {
  99. panic(fmt.Sprintf("fetchAddr: ", err))
  100. }
  101. if err := t.set(id, *event.Node.Value); err != nil {
  102. return fmt.Errorf("cannot parse the url of node %d: %v", id, err)
  103. }
  104. return nil
  105. }
  106. func (t *transporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  107. msg := new(raft.Message)
  108. if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
  109. log.Println(err)
  110. return
  111. }
  112. select {
  113. case t.recv <- msg:
  114. default:
  115. log.Println("drop")
  116. // drop the incoming package at network layer if the upper layer
  117. // cannot consume them in time.
  118. // TODO(xiangli): not return 200.
  119. }
  120. return
  121. }