peers.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package etcdhttp
  2. import (
  3. "bytes"
  4. "fmt"
  5. "log"
  6. "math/rand"
  7. "net/http"
  8. "net/url"
  9. "sort"
  10. "strconv"
  11. "github.com/coreos/etcd/elog"
  12. "github.com/coreos/etcd/raft/raftpb"
  13. )
  14. // Peers contains a mapping of unique IDs to a list of hostnames/IP addresses
  15. type Peers map[int64][]string
  16. // addScheme adds the protocol prefix to a string; currently only HTTP
  17. // TODO: improve this when implementing TLS
  18. func addScheme(addr string) string {
  19. return fmt.Sprintf("http://%s", addr)
  20. }
  21. // Pick returns a random address from a given Peer's addresses. If the
  22. // given peer does not exist, an empty string is returned.
  23. func (ps Peers) Pick(id int64) string {
  24. addrs := ps[id]
  25. if len(addrs) == 0 {
  26. return ""
  27. }
  28. return addrs[rand.Intn(len(addrs))]
  29. }
  30. // Set parses command line sets of names to IPs formatted like:
  31. // a=1.1.1.1&a=1.1.1.2&b=2.2.2.2
  32. func (ps *Peers) Set(s string) error {
  33. m := make(map[int64][]string)
  34. v, err := url.ParseQuery(s)
  35. if err != nil {
  36. return err
  37. }
  38. for k, v := range v {
  39. id, err := strconv.ParseInt(k, 0, 64)
  40. if err != nil {
  41. return err
  42. }
  43. m[id] = v
  44. }
  45. *ps = m
  46. return nil
  47. }
  48. func (ps *Peers) String() string {
  49. v := url.Values{}
  50. for k, vv := range *ps {
  51. for i := range vv {
  52. v.Add(strconv.FormatInt(k, 16), vv[i])
  53. }
  54. }
  55. return v.Encode()
  56. }
  57. func (ps Peers) IDs() []int64 {
  58. var ids []int64
  59. for id := range ps {
  60. ids = append(ids, id)
  61. }
  62. return ids
  63. }
  64. // Endpoints returns a list of all peer addresses. Each address is prefixed
  65. // with the scheme (currently "http://"). The returned list is sorted in
  66. // ascending lexicographical order.
  67. func (ps Peers) Endpoints() []string {
  68. endpoints := make([]string, 0)
  69. for _, addrs := range ps {
  70. for _, addr := range addrs {
  71. endpoints = append(endpoints, addScheme(addr))
  72. }
  73. }
  74. sort.Strings(endpoints)
  75. return endpoints
  76. }
  77. // Addrs returns a list of all peer addresses. The returned list is sorted
  78. // in ascending lexicographical order.
  79. func (ps Peers) Addrs() []string {
  80. addrs := make([]string, 0)
  81. for _, paddrs := range ps {
  82. for _, paddr := range paddrs {
  83. addrs = append(addrs, paddr)
  84. }
  85. }
  86. sort.Strings(addrs)
  87. return addrs
  88. }
  89. func Sender(t *http.Transport, p Peers) func(msgs []raftpb.Message) {
  90. c := &http.Client{Transport: t}
  91. scheme := "http"
  92. if t.TLSClientConfig != nil {
  93. scheme = "https"
  94. }
  95. return func(msgs []raftpb.Message) {
  96. for _, m := range msgs {
  97. // TODO: reuse go routines
  98. // limit the number of outgoing connections for the same receiver
  99. go send(c, scheme, p, m)
  100. }
  101. }
  102. }
  103. func send(c *http.Client, scheme string, p Peers, m raftpb.Message) {
  104. // TODO (xiangli): reasonable retry logic
  105. for i := 0; i < 3; i++ {
  106. addr := p.Pick(m.To)
  107. if addr == "" {
  108. // TODO: unknown peer id.. what do we do? I
  109. // don't think his should ever happen, need to
  110. // look into this further.
  111. log.Printf("etcdhttp: no addr for %d", m.To)
  112. return
  113. }
  114. url := fmt.Sprintf("%s://%s%s", scheme, addr, raftPrefix)
  115. // TODO: don't block. we should be able to have 1000s
  116. // of messages out at a time.
  117. data, err := m.Marshal()
  118. if err != nil {
  119. log.Println("etcdhttp: dropping message:", err)
  120. return // drop bad message
  121. }
  122. if httpPost(c, url, data) {
  123. return // success
  124. }
  125. // TODO: backoff
  126. }
  127. }
  128. func httpPost(c *http.Client, url string, data []byte) bool {
  129. resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
  130. if err != nil {
  131. elog.TODO()
  132. return false
  133. }
  134. resp.Body.Close()
  135. if resp.StatusCode != http.StatusNoContent {
  136. elog.TODO()
  137. return false
  138. }
  139. return true
  140. }