sender.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rafthttp
  14. import (
  15. "bytes"
  16. "fmt"
  17. "log"
  18. "net/http"
  19. "sync"
  20. "time"
  21. "github.com/coreos/etcd/etcdserver/stats"
  22. "github.com/coreos/etcd/pkg/pbutil"
  23. "github.com/coreos/etcd/pkg/types"
  24. "github.com/coreos/etcd/raft/raftpb"
  25. )
  26. const (
  27. connPerSender = 4
  28. senderBufSize = connPerSender * 4
  29. )
  30. type Sender interface {
  31. // StartStreaming enables streaming in the sender using the given writer,
  32. // which provides a fast and effecient way to send appendEntry messages.
  33. StartStreaming(w WriteFlusher, to types.ID, term uint64) (done <-chan struct{}, err error)
  34. Update(u string)
  35. // Send sends the data to the remote node. It is always non-blocking.
  36. // It may be fail to send data if it returns nil error.
  37. Send(m raftpb.Message) error
  38. // Stop performs any necessary finalization and terminates the Sender
  39. // elegantly.
  40. Stop()
  41. }
  42. func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
  43. s := &sender{
  44. tr: tr,
  45. u: u,
  46. cid: cid,
  47. p: p,
  48. fs: fs,
  49. shouldstop: shouldstop,
  50. q: make(chan []byte, senderBufSize),
  51. }
  52. s.wg.Add(connPerSender)
  53. for i := 0; i < connPerSender; i++ {
  54. go s.handle()
  55. }
  56. return s
  57. }
  58. type sender struct {
  59. tr http.RoundTripper
  60. u string
  61. cid types.ID
  62. p Processor
  63. fs *stats.FollowerStats
  64. shouldstop chan struct{}
  65. strmCln *streamClient
  66. strmSrv *streamServer
  67. strmSrvMu sync.Mutex
  68. q chan []byte
  69. mu sync.RWMutex
  70. wg sync.WaitGroup
  71. }
  72. func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
  73. s.strmSrvMu.Lock()
  74. defer s.strmSrvMu.Unlock()
  75. if s.strmSrv != nil {
  76. // ignore lower-term streaming request
  77. if term < s.strmSrv.term {
  78. return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, s.strmSrv.term)
  79. }
  80. // stop the existing one
  81. s.strmSrv.stop()
  82. }
  83. s.strmSrv = startStreamServer(w, to, term, s.fs)
  84. return s.strmSrv.stopNotify(), nil
  85. }
  86. func (s *sender) Update(u string) {
  87. s.mu.Lock()
  88. defer s.mu.Unlock()
  89. s.u = u
  90. }
  91. // TODO (xiangli): reasonable retry logic
  92. func (s *sender) Send(m raftpb.Message) error {
  93. s.maybeStopStream(m.Term)
  94. if !s.hasStreamClient() && shouldInitStream(m) {
  95. s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
  96. }
  97. if canUseStream(m) {
  98. if ok := s.tryStream(m); ok {
  99. return nil
  100. }
  101. }
  102. // TODO: don't block. we should be able to have 1000s
  103. // of messages out at a time.
  104. data := pbutil.MustMarshal(&m)
  105. select {
  106. case s.q <- data:
  107. return nil
  108. default:
  109. log.Printf("sender: reach the maximal serving to %s", s.u)
  110. return fmt.Errorf("reach maximal serving")
  111. }
  112. }
  113. func (s *sender) Stop() {
  114. close(s.q)
  115. s.wg.Wait()
  116. s.strmSrvMu.Lock()
  117. if s.strmSrv != nil {
  118. s.strmSrv.stop()
  119. }
  120. s.strmSrvMu.Unlock()
  121. if s.strmCln != nil {
  122. s.strmCln.stop()
  123. }
  124. }
  125. func (s *sender) maybeStopStream(term uint64) {
  126. if s.strmCln != nil && term > s.strmCln.term {
  127. s.strmCln.stop()
  128. s.strmCln = nil
  129. }
  130. s.strmSrvMu.Lock()
  131. defer s.strmSrvMu.Unlock()
  132. if s.strmSrv != nil && term > s.strmSrv.term {
  133. s.strmSrv.stop()
  134. s.strmSrv = nil
  135. }
  136. }
  137. func (s *sender) hasStreamClient() bool {
  138. return s.strmCln != nil && !s.strmCln.isStopped()
  139. }
  140. func (s *sender) initStream(from, to types.ID, term uint64) {
  141. strmCln := newStreamClient(from, to, term, s.p)
  142. s.mu.Lock()
  143. u := s.u
  144. s.mu.Unlock()
  145. if err := strmCln.start(s.tr, u, s.cid); err != nil {
  146. log.Printf("rafthttp: start stream client error: %v", err)
  147. return
  148. }
  149. s.strmCln = strmCln
  150. log.Printf("rafthttp: start stream client with %s in term %d", to, term)
  151. }
  152. func (s *sender) tryStream(m raftpb.Message) bool {
  153. s.strmSrvMu.Lock()
  154. defer s.strmSrvMu.Unlock()
  155. if s.strmSrv == nil || m.Term != s.strmSrv.term {
  156. return false
  157. }
  158. if err := s.strmSrv.send(m.Entries); err != nil {
  159. log.Printf("rafthttp: send stream message error: %v", err)
  160. s.strmSrv.stop()
  161. s.strmSrv = nil
  162. return false
  163. }
  164. return true
  165. }
  166. func (s *sender) handle() {
  167. defer s.wg.Done()
  168. for d := range s.q {
  169. start := time.Now()
  170. err := s.post(d)
  171. end := time.Now()
  172. if err != nil {
  173. s.fs.Fail()
  174. log.Printf("sender: %v", err)
  175. continue
  176. }
  177. s.fs.Succ(end.Sub(start))
  178. }
  179. }
  180. // post POSTs a data payload to a url. Returns nil if the POST succeeds,
  181. // error on any failure.
  182. func (s *sender) post(data []byte) error {
  183. s.mu.RLock()
  184. req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
  185. s.mu.RUnlock()
  186. if err != nil {
  187. return fmt.Errorf("new request to %s error: %v", s.u, err)
  188. }
  189. req.Header.Set("Content-Type", "application/protobuf")
  190. req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
  191. resp, err := s.tr.RoundTrip(req)
  192. if err != nil {
  193. return fmt.Errorf("error posting to %q: %v", req.URL.String(), err)
  194. }
  195. resp.Body.Close()
  196. switch resp.StatusCode {
  197. case http.StatusPreconditionFailed:
  198. select {
  199. case s.shouldstop <- struct{}{}:
  200. default:
  201. }
  202. log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
  203. return nil
  204. case http.StatusForbidden:
  205. select {
  206. case s.shouldstop <- struct{}{}:
  207. default:
  208. }
  209. log.Println("etcdserver: this member has been permanently removed from the cluster")
  210. log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
  211. return nil
  212. case http.StatusNoContent:
  213. return nil
  214. default:
  215. return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode))
  216. }
  217. }