streamer.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rafthttp
  14. import (
  15. "fmt"
  16. "io"
  17. "log"
  18. "net/http"
  19. "net/url"
  20. "path"
  21. "strconv"
  22. "time"
  23. "github.com/coreos/etcd/etcdserver/stats"
  24. "github.com/coreos/etcd/pkg/types"
  25. "github.com/coreos/etcd/raft/raftpb"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  27. )
  28. const (
  29. streamBufSize = 1024
  30. )
  31. type WriteFlusher interface {
  32. io.Writer
  33. http.Flusher
  34. }
  35. type streamServer struct {
  36. to types.ID
  37. term uint64
  38. fs *stats.FollowerStats
  39. q chan []raftpb.Entry
  40. done chan struct{}
  41. }
  42. func startStreamServer(w WriteFlusher, to types.ID, term uint64, fs *stats.FollowerStats) *streamServer {
  43. s := &streamServer{
  44. to: to,
  45. term: term,
  46. fs: fs,
  47. q: make(chan []raftpb.Entry, streamBufSize),
  48. done: make(chan struct{}),
  49. }
  50. go s.handle(w)
  51. log.Printf("rafthttp: stream server to %s at term %d starts", to, term)
  52. return s
  53. }
  54. func (s *streamServer) send(ents []raftpb.Entry) error {
  55. select {
  56. case <-s.done:
  57. return fmt.Errorf("stopped")
  58. default:
  59. }
  60. select {
  61. case s.q <- ents:
  62. return nil
  63. default:
  64. log.Printf("rafthttp: streamer reaches maximal serving to %s", s.to)
  65. return fmt.Errorf("reach maximal serving")
  66. }
  67. }
  68. func (s *streamServer) stop() {
  69. close(s.q)
  70. <-s.done
  71. }
  72. func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
  73. func (s *streamServer) handle(w WriteFlusher) {
  74. defer func() {
  75. close(s.done)
  76. log.Printf("rafthttp: stream server to %s at term %d is closed", s.to, s.term)
  77. }()
  78. ew := &entryWriter{w: w}
  79. for ents := range s.q {
  80. start := time.Now()
  81. if err := ew.writeEntries(ents); err != nil {
  82. log.Printf("rafthttp: write ents error: %v", err)
  83. return
  84. }
  85. w.Flush()
  86. s.fs.Succ(time.Since(start))
  87. }
  88. }
  89. type streamClient struct {
  90. id types.ID
  91. to types.ID
  92. term uint64
  93. p Processor
  94. closer io.Closer
  95. done chan struct{}
  96. }
  97. func newStreamClient(id, to types.ID, term uint64, p Processor) *streamClient {
  98. return &streamClient{
  99. id: id,
  100. to: to,
  101. term: term,
  102. p: p,
  103. done: make(chan struct{}),
  104. }
  105. }
  106. // Dial dials to the remote url, and sends streaming request. If it succeeds,
  107. // it returns nil error, and the caller should call Handle function to keep
  108. // receiving appendEntry messages.
  109. func (s *streamClient) start(tr http.RoundTripper, u string, cid types.ID) error {
  110. uu, err := url.Parse(u)
  111. if err != nil {
  112. return fmt.Errorf("parse url %s error: %v", u, err)
  113. }
  114. uu.Path = path.Join(RaftStreamPrefix, s.id.String())
  115. req, err := http.NewRequest("GET", uu.String(), nil)
  116. if err != nil {
  117. return fmt.Errorf("new request to %s error: %v", u, err)
  118. }
  119. req.Header.Set("X-Etcd-Cluster-ID", cid.String())
  120. req.Header.Set("X-Raft-To", s.to.String())
  121. req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
  122. resp, err := tr.RoundTrip(req)
  123. if err != nil {
  124. return fmt.Errorf("error posting to %q: %v", u, err)
  125. }
  126. if resp.StatusCode != http.StatusOK {
  127. resp.Body.Close()
  128. return fmt.Errorf("unhandled http status %d", resp.StatusCode)
  129. }
  130. s.closer = resp.Body
  131. go s.handle(resp.Body)
  132. log.Printf("rafthttp: stream client to %s at term %d starts", s.to, s.term)
  133. return nil
  134. }
  135. func (s *streamClient) stop() {
  136. s.closer.Close()
  137. <-s.done
  138. }
  139. func (s *streamClient) isStopped() bool {
  140. select {
  141. case <-s.done:
  142. return true
  143. default:
  144. return false
  145. }
  146. }
  147. func (s *streamClient) handle(r io.Reader) {
  148. defer func() {
  149. close(s.done)
  150. log.Printf("rafthttp: stream client to %s at term %d is closed", s.to, s.term)
  151. }()
  152. er := &entryReader{r: r}
  153. for {
  154. ents, err := er.readEntries()
  155. if err != nil {
  156. if err != io.EOF {
  157. log.Printf("rafthttp: read ents error: %v", err)
  158. }
  159. return
  160. }
  161. // Considering Commit in MsgApp is not recovered, zero-entry appendEntry
  162. // messages have no use to raft state machine. Drop it here because
  163. // we don't have easy way to recover its Index easily.
  164. if len(ents) == 0 {
  165. continue
  166. }
  167. // The commit index field in appendEntry message is not recovered.
  168. // The follower updates its commit index through heartbeat.
  169. msg := raftpb.Message{
  170. Type: raftpb.MsgApp,
  171. From: uint64(s.to),
  172. To: uint64(s.id),
  173. Term: s.term,
  174. LogTerm: s.term,
  175. Index: ents[0].Index - 1,
  176. Entries: ents,
  177. }
  178. if err := s.p.Process(context.TODO(), msg); err != nil {
  179. log.Printf("rafthttp: process raft message error: %v", err)
  180. return
  181. }
  182. }
  183. }
  184. func shouldInitStream(m raftpb.Message) bool {
  185. return m.Type == raftpb.MsgAppResp && m.Reject == false
  186. }
  187. func canUseStream(m raftpb.Message) bool {
  188. return m.Type == raftpb.MsgApp && m.Index > 0 && m.Term == m.LogTerm
  189. }