streamer.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rafthttp
  14. import (
  15. "fmt"
  16. "io"
  17. "log"
  18. "net/http"
  19. "net/url"
  20. "path"
  21. "strconv"
  22. "time"
  23. "github.com/coreos/etcd/etcdserver/stats"
  24. "github.com/coreos/etcd/pkg/types"
  25. "github.com/coreos/etcd/raft/raftpb"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  27. )
  28. const (
  29. streamBufSize = 1024
  30. )
  31. type WriteFlusher interface {
  32. io.Writer
  33. http.Flusher
  34. }
  35. type streamServer struct {
  36. to types.ID
  37. term uint64
  38. fs *stats.FollowerStats
  39. q chan []raftpb.Entry
  40. done chan struct{}
  41. }
  42. func startStreamServer(w WriteFlusher, to types.ID, term uint64, fs *stats.FollowerStats) *streamServer {
  43. s := &streamServer{
  44. to: to,
  45. term: term,
  46. fs: fs,
  47. q: make(chan []raftpb.Entry, streamBufSize),
  48. done: make(chan struct{}),
  49. }
  50. go s.handle(w)
  51. return s
  52. }
  53. func (s *streamServer) send(ents []raftpb.Entry) error {
  54. select {
  55. case <-s.done:
  56. return fmt.Errorf("stopped")
  57. default:
  58. }
  59. select {
  60. case s.q <- ents:
  61. return nil
  62. default:
  63. log.Printf("rafthttp: streamer reaches maximal serving to %s", s.to)
  64. return fmt.Errorf("reach maximal serving")
  65. }
  66. }
  67. func (s *streamServer) stop() {
  68. close(s.q)
  69. <-s.done
  70. }
  71. func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
  72. func (s *streamServer) handle(w WriteFlusher) {
  73. defer close(s.done)
  74. ew := &entryWriter{w: w}
  75. for ents := range s.q {
  76. start := time.Now()
  77. if err := ew.writeEntries(ents); err != nil {
  78. log.Printf("rafthttp: write ents error: %v", err)
  79. return
  80. }
  81. w.Flush()
  82. s.fs.Succ(time.Since(start))
  83. }
  84. }
  85. type streamClient struct {
  86. id types.ID
  87. to types.ID
  88. term uint64
  89. p Processor
  90. closer io.Closer
  91. done chan struct{}
  92. }
  93. func newStreamClient(id, to types.ID, term uint64, p Processor) *streamClient {
  94. return &streamClient{
  95. id: id,
  96. to: to,
  97. term: term,
  98. p: p,
  99. done: make(chan struct{}),
  100. }
  101. }
  102. // Dial dials to the remote url, and sends streaming request. If it succeeds,
  103. // it returns nil error, and the caller should call Handle function to keep
  104. // receiving appendEntry messages.
  105. func (s *streamClient) start(tr http.RoundTripper, u string, cid types.ID) error {
  106. uu, err := url.Parse(u)
  107. if err != nil {
  108. return fmt.Errorf("parse url %s error: %v", u, err)
  109. }
  110. uu.Path = path.Join(RaftStreamPrefix, s.id.String())
  111. req, err := http.NewRequest("GET", uu.String(), nil)
  112. if err != nil {
  113. return fmt.Errorf("new request to %s error: %v", u, err)
  114. }
  115. req.Header.Set("X-Etcd-Cluster-ID", cid.String())
  116. req.Header.Set("X-Raft-To", s.to.String())
  117. req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
  118. resp, err := tr.RoundTrip(req)
  119. if err != nil {
  120. return fmt.Errorf("error posting to %q: %v", u, err)
  121. }
  122. if resp.StatusCode != http.StatusOK {
  123. resp.Body.Close()
  124. return fmt.Errorf("unhandled http status %d", resp.StatusCode)
  125. }
  126. s.closer = resp.Body
  127. go s.handle(resp.Body)
  128. return nil
  129. }
  130. func (s *streamClient) stop() {
  131. s.closer.Close()
  132. <-s.done
  133. }
  134. func (s *streamClient) isStopped() bool {
  135. select {
  136. case <-s.done:
  137. return true
  138. default:
  139. return false
  140. }
  141. }
  142. func (s *streamClient) handle(r io.Reader) {
  143. defer close(s.done)
  144. er := &entryReader{r: r}
  145. for {
  146. ents, err := er.readEntries()
  147. if err != nil {
  148. if err != io.EOF {
  149. log.Printf("rafthttp: read ents error: %v", err)
  150. }
  151. return
  152. }
  153. // Considering Commit in MsgApp is not recovered, zero-entry appendEntry
  154. // messages have no use to raft state machine. Drop it here because
  155. // we don't have easy way to recover its Index easily.
  156. if len(ents) == 0 {
  157. continue
  158. }
  159. // The commit index field in appendEntry message is not recovered.
  160. // The follower updates its commit index through heartbeat.
  161. msg := raftpb.Message{
  162. Type: raftpb.MsgApp,
  163. From: uint64(s.to),
  164. To: uint64(s.id),
  165. Term: s.term,
  166. LogTerm: s.term,
  167. Index: ents[0].Index - 1,
  168. Entries: ents,
  169. }
  170. if err := s.p.Process(context.TODO(), msg); err != nil {
  171. log.Printf("rafthttp: process raft message error: %v", err)
  172. return
  173. }
  174. }
  175. }
  176. func shouldInitStream(m raftpb.Message) bool {
  177. return m.Type == raftpb.MsgAppResp && m.Reject == false
  178. }
  179. func canUseStream(m raftpb.Message) bool {
  180. return m.Type == raftpb.MsgApp && m.Index > 0 && m.Term == m.LogTerm
  181. }