streamer.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rafthttp
  14. import (
  15. "errors"
  16. "fmt"
  17. "io"
  18. "log"
  19. "math"
  20. "net/http"
  21. "net/url"
  22. "path"
  23. "strconv"
  24. "sync"
  25. "time"
  26. "github.com/coreos/etcd/etcdserver/stats"
  27. "github.com/coreos/etcd/pkg/types"
  28. "github.com/coreos/etcd/raft/raftpb"
  29. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  30. )
  31. const (
  32. streamBufSize = 4096
  33. )
  34. // TODO: a stream might hava one stream server or one stream client, but not both.
  35. type stream struct {
  36. sync.Mutex
  37. w *streamWriter
  38. r *streamReader
  39. stopped bool
  40. }
  41. func (s *stream) open(from, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) error {
  42. rd, err := newStreamReader(from, to, cid, term, tr, u, r)
  43. if err != nil {
  44. log.Printf("stream: error opening stream: %v", err)
  45. return err
  46. }
  47. s.Lock()
  48. defer s.Unlock()
  49. if s.stopped {
  50. rd.stop()
  51. return errors.New("stream: stopped")
  52. }
  53. if s.r != nil {
  54. panic("open: stream is open")
  55. }
  56. s.r = rd
  57. return nil
  58. }
  59. func (s *stream) attach(sw *streamWriter) error {
  60. s.Lock()
  61. defer s.Unlock()
  62. if s.stopped {
  63. return errors.New("stream: stopped")
  64. }
  65. if s.w != nil {
  66. // ignore lower-term streaming request
  67. if sw.term < s.w.term {
  68. return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term)
  69. }
  70. s.w.stop()
  71. }
  72. s.w = sw
  73. return nil
  74. }
  75. func (s *stream) write(m raftpb.Message) bool {
  76. s.Lock()
  77. defer s.Unlock()
  78. if s.stopped {
  79. return false
  80. }
  81. if s.w == nil {
  82. return false
  83. }
  84. if m.Term != s.w.term {
  85. if m.Term > s.w.term {
  86. panic("expected server to be invalidated when there is a higher term message")
  87. }
  88. return false
  89. }
  90. // todo: early unlock?
  91. if err := s.w.send(m.Entries); err != nil {
  92. log.Printf("stream: error sending message: %v", err)
  93. log.Printf("stream: stopping the stream server...")
  94. s.w.stop()
  95. s.w = nil
  96. return false
  97. }
  98. return true
  99. }
  100. // invalidate stops the sever/client that is running at
  101. // a term lower than the given term.
  102. func (s *stream) invalidate(term uint64) {
  103. s.Lock()
  104. defer s.Unlock()
  105. if s.w != nil {
  106. if s.w.term < term {
  107. s.w.stop()
  108. s.w = nil
  109. }
  110. }
  111. if s.r != nil {
  112. if s.r.term < term {
  113. s.r.stop()
  114. s.r = nil
  115. }
  116. }
  117. if term == math.MaxUint64 {
  118. s.stopped = true
  119. }
  120. }
  121. func (s *stream) stop() {
  122. s.invalidate(math.MaxUint64)
  123. }
  124. func (s *stream) isOpen() bool {
  125. s.Lock()
  126. defer s.Unlock()
  127. if s.r != nil && s.r.isStopped() {
  128. s.r = nil
  129. }
  130. return s.r != nil
  131. }
  132. type WriteFlusher interface {
  133. io.Writer
  134. http.Flusher
  135. }
  136. // TODO: replace fs with stream stats
  137. type streamWriter struct {
  138. to types.ID
  139. term uint64
  140. fs *stats.FollowerStats
  141. q chan []raftpb.Entry
  142. done chan struct{}
  143. }
  144. // newStreamServer starts and returns a new started stream server.
  145. // The caller should call stop when finished, to shut it down.
  146. func newStreamWriter(w WriteFlusher, to types.ID, term uint64) *streamWriter {
  147. s := &streamWriter{
  148. to: to,
  149. term: term,
  150. q: make(chan []raftpb.Entry, streamBufSize),
  151. done: make(chan struct{}),
  152. }
  153. go s.handle(w)
  154. return s
  155. }
  156. func (s *streamWriter) send(ents []raftpb.Entry) error {
  157. select {
  158. case <-s.done:
  159. return fmt.Errorf("stopped")
  160. default:
  161. }
  162. select {
  163. case s.q <- ents:
  164. return nil
  165. default:
  166. log.Printf("rafthttp: maximum number of stream buffer entries to %d has been reached", s.to)
  167. return fmt.Errorf("maximum number of stream buffer entries has been reached")
  168. }
  169. }
  170. func (s *streamWriter) handle(w WriteFlusher) {
  171. defer func() {
  172. close(s.done)
  173. log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
  174. }()
  175. ew := &entryWriter{w: w}
  176. for ents := range s.q {
  177. start := time.Now()
  178. if err := ew.writeEntries(ents); err != nil {
  179. log.Printf("rafthttp: encountered error writing to server log stream: %v", err)
  180. return
  181. }
  182. w.Flush()
  183. s.fs.Succ(time.Since(start))
  184. }
  185. }
  186. func (s *streamWriter) stop() {
  187. close(s.q)
  188. <-s.done
  189. }
  190. func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
  191. // TODO: move the raft interface out of the reader.
  192. type streamReader struct {
  193. id types.ID
  194. to types.ID
  195. term uint64
  196. r Raft
  197. closer io.Closer
  198. done chan struct{}
  199. }
  200. // newStreamClient starts and returns a new started stream client.
  201. // The caller should call stop when finished, to shut it down.
  202. func newStreamReader(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) (*streamReader, error) {
  203. s := &streamReader{
  204. id: id,
  205. to: to,
  206. term: term,
  207. r: r,
  208. done: make(chan struct{}),
  209. }
  210. uu, err := url.Parse(u)
  211. if err != nil {
  212. return nil, fmt.Errorf("parse url %s error: %v", u, err)
  213. }
  214. uu.Path = path.Join(RaftStreamPrefix, s.id.String())
  215. req, err := http.NewRequest("GET", uu.String(), nil)
  216. if err != nil {
  217. return nil, fmt.Errorf("new request to %s error: %v", u, err)
  218. }
  219. req.Header.Set("X-Etcd-Cluster-ID", cid.String())
  220. req.Header.Set("X-Raft-To", s.to.String())
  221. req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
  222. resp, err := tr.RoundTrip(req)
  223. if err != nil {
  224. return nil, fmt.Errorf("error posting to %q: %v", u, err)
  225. }
  226. if resp.StatusCode != http.StatusOK {
  227. resp.Body.Close()
  228. return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
  229. }
  230. s.closer = resp.Body
  231. go s.handle(resp.Body)
  232. log.Printf("rafthttp: starting client stream to %s at term %d", s.to, s.term)
  233. return s, nil
  234. }
  235. func (s *streamReader) stop() {
  236. s.closer.Close()
  237. <-s.done
  238. }
  239. func (s *streamReader) isStopped() bool {
  240. select {
  241. case <-s.done:
  242. return true
  243. default:
  244. return false
  245. }
  246. }
  247. func (s *streamReader) handle(r io.Reader) {
  248. defer func() {
  249. close(s.done)
  250. log.Printf("rafthttp: client streaming to %s at term %d has been stopped", s.to, s.term)
  251. }()
  252. er := &entryReader{r: r}
  253. for {
  254. ents, err := er.readEntries()
  255. if err != nil {
  256. if err != io.EOF {
  257. log.Printf("rafthttp: encountered error reading the client log stream: %v", err)
  258. }
  259. return
  260. }
  261. // Considering Commit in MsgApp is not recovered, zero-entry appendEntry
  262. // messages have no use to raft state machine. Drop it here because
  263. // we don't have easy way to recover its Index easily.
  264. if len(ents) == 0 {
  265. continue
  266. }
  267. // The commit index field in appendEntry message is not recovered.
  268. // The follower updates its commit index through heartbeat.
  269. msg := raftpb.Message{
  270. Type: raftpb.MsgApp,
  271. From: uint64(s.to),
  272. To: uint64(s.id),
  273. Term: s.term,
  274. LogTerm: s.term,
  275. Index: ents[0].Index - 1,
  276. Entries: ents,
  277. }
  278. if err := s.r.Process(context.TODO(), msg); err != nil {
  279. log.Printf("rafthttp: process raft message error: %v", err)
  280. return
  281. }
  282. }
  283. }
  284. func shouldInitStream(m raftpb.Message) bool {
  285. return m.Type == raftpb.MsgAppResp && m.Reject == false
  286. }
  287. func canUseStream(m raftpb.Message) bool {
  288. return m.Type == raftpb.MsgApp && m.Index > 0 && m.Term == m.LogTerm
  289. }