transporter.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package etcd
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/ioutil"
  9. "log"
  10. "net/http"
  11. "net/url"
  12. "path"
  13. "strconv"
  14. "sync"
  15. "github.com/coreos/etcd/raft"
  16. )
  17. var (
  18. errUnknownNode = errors.New("unknown node")
  19. )
  20. type transporter struct {
  21. mu sync.RWMutex
  22. stopped bool
  23. urls map[int64]string
  24. recv chan *raft.Message
  25. client *http.Client
  26. wg sync.WaitGroup
  27. *http.ServeMux
  28. }
  29. func newTransporter(tc *tls.Config) *transporter {
  30. tr := new(http.Transport)
  31. tr.TLSClientConfig = tc
  32. c := &http.Client{Transport: tr}
  33. t := &transporter{
  34. urls: make(map[int64]string),
  35. recv: make(chan *raft.Message, 512),
  36. client: c,
  37. }
  38. t.ServeMux = http.NewServeMux()
  39. t.ServeMux.HandleFunc("/raft/cfg/", t.serveCfg)
  40. t.ServeMux.HandleFunc("/raft", t.serveRaft)
  41. return t
  42. }
  43. func (t *transporter) stop() {
  44. t.mu.Lock()
  45. t.stopped = true
  46. t.mu.Unlock()
  47. t.wg.Wait()
  48. tr := t.client.Transport.(*http.Transport)
  49. tr.CloseIdleConnections()
  50. }
  51. func (t *transporter) set(nodeId int64, rawurl string) error {
  52. u, err := url.Parse(rawurl)
  53. if err != nil {
  54. return err
  55. }
  56. u.Path = raftPrefix
  57. t.mu.Lock()
  58. t.urls[nodeId] = u.String()
  59. t.mu.Unlock()
  60. return nil
  61. }
  62. func (t *transporter) sendTo(nodeId int64, data []byte) error {
  63. t.mu.RLock()
  64. url := t.urls[nodeId]
  65. t.mu.RUnlock()
  66. if len(url) == 0 {
  67. return errUnknownNode
  68. }
  69. return t.send(url, data)
  70. }
  71. func (t *transporter) send(addr string, data []byte) error {
  72. t.mu.RLock()
  73. if t.stopped {
  74. t.mu.RUnlock()
  75. return fmt.Errorf("transporter stopped")
  76. }
  77. t.wg.Add(1)
  78. defer t.wg.Done()
  79. t.mu.RUnlock()
  80. buf := bytes.NewBuffer(data)
  81. resp, err := t.client.Post(addr, "application/octet-stream", buf)
  82. if err != nil {
  83. return err
  84. }
  85. resp.Body.Close()
  86. return nil
  87. }
  88. func (t *transporter) fetchAddr(seedurl string, id int64) error {
  89. u, err := url.Parse(seedurl)
  90. if err != nil {
  91. return fmt.Errorf("cannot parse the url of the given seed")
  92. }
  93. u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
  94. resp, err := t.client.Get(u.String())
  95. if err != nil {
  96. return fmt.Errorf("cannot reach %v", u)
  97. }
  98. defer resp.Body.Close()
  99. b, err := ioutil.ReadAll(resp.Body)
  100. if err != nil {
  101. return fmt.Errorf("cannot reach %v", u)
  102. }
  103. if err := t.set(id, string(b)); err != nil {
  104. return fmt.Errorf("cannot parse the url of node %d: %v", id, err)
  105. }
  106. return nil
  107. }
  108. func (t *transporter) serveRaft(w http.ResponseWriter, r *http.Request) {
  109. msg := new(raft.Message)
  110. if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
  111. log.Println(err)
  112. return
  113. }
  114. select {
  115. case t.recv <- msg:
  116. default:
  117. log.Println("drop")
  118. // drop the incoming package at network layer if the upper layer
  119. // cannot consume them in time.
  120. // TODO(xiangli): not return 200.
  121. }
  122. return
  123. }
  124. func (t *transporter) serveCfg(w http.ResponseWriter, r *http.Request) {
  125. id, err := strconv.ParseInt(r.URL.Path[len("/raft/cfg/"):], 10, 64)
  126. if err != nil {
  127. http.Error(w, err.Error(), http.StatusBadRequest)
  128. return
  129. }
  130. t.mu.RLock()
  131. u, ok := t.urls[id]
  132. t.mu.RUnlock()
  133. if ok {
  134. w.Write([]byte(u))
  135. return
  136. }
  137. http.Error(w, "Not Found", http.StatusNotFound)
  138. }