sender.go 4.9 KB


  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "bytes"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "net/url"
  20. "path"
  21. "sync"
  22. "time"
  23. "github.com/coreos/etcd/etcdserver/stats"
  24. "github.com/coreos/etcd/pkg/types"
  25. "github.com/coreos/etcd/raft/raftpb"
  26. )
  27. const (
  28. raftPrefix = "/raft"
  29. connPerSender = 4
  30. )
  31. type sendHub struct {
  32. tr *http.Transport
  33. cl ClusterInfo
  34. ss *stats.ServerStats
  35. ls *stats.LeaderStats
  36. senders map[types.ID]*sender
  37. }
  38. // newSendHub creates the default send hub used to transport raft messages
  39. // to other members. The returned sendHub will update the given ServerStats and
  40. // LeaderStats appropriately.
  41. func newSendHub(t *http.Transport, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
  42. h := &sendHub{
  43. tr: t,
  44. cl: cl,
  45. ss: ss,
  46. ls: ls,
  47. senders: make(map[types.ID]*sender),
  48. }
  49. for _, m := range cl.Members() {
  50. h.Add(m)
  51. }
  52. return h
  53. }
  54. func (h *sendHub) Send(msgs []raftpb.Message) {
  55. for _, m := range msgs {
  56. to := types.ID(m.To)
  57. s, ok := h.senders[to]
  58. if !ok {
  59. if !h.cl.IsIDRemoved(to) {
  60. log.Printf("etcdserver: send message to unknown receiver %s", to)
  61. }
  62. continue
  63. }
  64. // TODO: don't block. we should be able to have 1000s
  65. // of messages out at a time.
  66. data, err := m.Marshal()
  67. if err != nil {
  68. log.Println("sender: dropping message:", err)
  69. return // drop bad message
  70. }
  71. if m.Type == raftpb.MsgApp {
  72. h.ss.SendAppendReq(len(data))
  73. }
  74. // TODO (xiangli): reasonable retry logic
  75. s.send(data)
  76. }
  77. }
  78. func (h *sendHub) Stop() {
  79. for _, s := range h.senders {
  80. s.stop()
  81. }
  82. }
  83. func (h *sendHub) Add(m *Member) {
  84. if _, ok := h.senders[m.ID]; ok {
  85. return
  86. }
  87. // TODO: considering how to switch between all available peer urls
  88. u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix)
  89. c := &http.Client{Transport: h.tr}
  90. fs := h.ls.Follower(m.ID.String())
  91. s := newSender(u, h.cl.ID(), c, fs)
  92. h.senders[m.ID] = s
  93. }
  94. func (h *sendHub) Remove(id types.ID) {
  95. h.senders[id].stop()
  96. delete(h.senders, id)
  97. }
  98. func (h *sendHub) Update(m *Member) {
  99. // TODO: return error or just panic?
  100. if _, ok := h.senders[m.ID]; !ok {
  101. return
  102. }
  103. peerURL := m.PickPeerURL()
  104. u, err := url.Parse(peerURL)
  105. if err != nil {
  106. log.Panicf("unexpect peer url %s", peerURL)
  107. }
  108. u.Path = path.Join(u.Path, raftPrefix)
  109. s := h.senders[m.ID]
  110. s.mu.Lock()
  111. defer s.mu.Unlock()
  112. s.u = u.String()
  113. }
  114. type sender struct {
  115. u string
  116. cid types.ID
  117. c *http.Client
  118. fs *stats.FollowerStats
  119. q chan []byte
  120. mu sync.RWMutex
  121. }
  122. func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender {
  123. s := &sender{
  124. u: u,
  125. cid: cid,
  126. c: c,
  127. fs: fs,
  128. q: make(chan []byte),
  129. }
  130. for i := 0; i < connPerSender; i++ {
  131. go s.handle()
  132. }
  133. return s
  134. }
  135. func (s *sender) send(data []byte) {
  136. select {
  137. case s.q <- data:
  138. default:
  139. log.Printf("sender: reach the maximal serving to %s", s.u)
  140. }
  141. }
  142. func (s *sender) stop() {
  143. close(s.q)
  144. }
  145. func (s *sender) handle() {
  146. for d := range s.q {
  147. start := time.Now()
  148. err := s.post(d)
  149. end := time.Now()
  150. if err != nil {
  151. s.fs.Fail()
  152. log.Printf("sender: %v", err)
  153. continue
  154. }
  155. s.fs.Succ(end.Sub(start))
  156. }
  157. }
  158. // post POSTs a data payload to a url. Returns nil if the POST succeeds,
  159. // error on any failure.
  160. func (s *sender) post(data []byte) error {
  161. s.mu.RLock()
  162. req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
  163. s.mu.RUnlock()
  164. if err != nil {
  165. return fmt.Errorf("new request to %s error: %v", s.u, err)
  166. }
  167. req.Header.Set("Content-Type", "application/protobuf")
  168. req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
  169. resp, err := s.c.Do(req)
  170. if err != nil {
  171. return fmt.Errorf("error posting to %q: %v", req.URL.String(), err)
  172. }
  173. resp.Body.Close()
  174. switch resp.StatusCode {
  175. case http.StatusPreconditionFailed:
  176. // TODO: shutdown the etcdserver gracefully?
  177. log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
  178. return nil
  179. case http.StatusForbidden:
  180. // TODO: stop the server
  181. log.Println("etcd: this member has been permanently removed from the cluster")
  182. log.Fatalln("etcd: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
  183. return nil
  184. case http.StatusNoContent:
  185. return nil
  186. default:
  187. return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode))
  188. }
  189. }