streamer.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rafthttp
  14. import (
  15. "fmt"
  16. "io"
  17. "log"
  18. "math"
  19. "net/http"
  20. "net/url"
  21. "path"
  22. "strconv"
  23. "sync"
  24. "time"
  25. "github.com/coreos/etcd/etcdserver/stats"
  26. "github.com/coreos/etcd/pkg/types"
  27. "github.com/coreos/etcd/raft/raftpb"
  28. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  29. )
  30. const (
  31. streamBufSize = 4096
  32. )
  33. // TODO: a stream might hava one stream server or one stream client, but not both.
  34. type stream struct {
  35. // the server might be attached asynchronously with the owner of the stream
  36. // use a mutex to protect it
  37. sync.Mutex
  38. server *streamServer
  39. client *streamClient
  40. }
  41. func (s *stream) open(from, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) error {
  42. if s.client != nil {
  43. panic("open: stream is open")
  44. }
  45. c, err := newStreamClient(from, to, cid, term, tr, u, r)
  46. if err != nil {
  47. log.Printf("stream: error opening stream: %v", err)
  48. return err
  49. }
  50. s.client = c
  51. return nil
  52. }
  53. func (s *stream) attach(server *streamServer) error {
  54. s.Lock()
  55. defer s.Unlock()
  56. if s.server != nil {
  57. // ignore lower-term streaming request
  58. if server.term < s.server.term {
  59. return fmt.Errorf("cannot attach out of data stream server [%d / %d]", server.term, s.server.term)
  60. }
  61. s.server.stop()
  62. }
  63. s.server = server
  64. return nil
  65. }
  66. func (s *stream) write(m raftpb.Message) bool {
  67. s.Lock()
  68. defer s.Unlock()
  69. if s.server == nil {
  70. return false
  71. }
  72. if m.Term != s.server.term {
  73. if m.Term > s.server.term {
  74. panic("expected server to be invalidated when there is a higher term message")
  75. }
  76. return false
  77. }
  78. // todo: early unlock?
  79. if err := s.server.send(m.Entries); err != nil {
  80. log.Printf("stream: error sending message: %v", err)
  81. log.Printf("stream: stopping the stream server...")
  82. s.server.stop()
  83. s.server = nil
  84. return false
  85. }
  86. return true
  87. }
  88. // invalidate stops the sever/client that is running at
  89. // a term lower than the given term.
  90. func (s *stream) invalidate(term uint64) {
  91. s.Lock()
  92. defer s.Unlock()
  93. if s.server != nil {
  94. if s.server.term < term {
  95. s.server.stop()
  96. s.server = nil
  97. }
  98. }
  99. if s.client != nil {
  100. if s.client.term < term {
  101. s.client.stop()
  102. s.client = nil
  103. }
  104. }
  105. }
  106. func (s *stream) stop() {
  107. s.invalidate(math.MaxUint64)
  108. }
  109. func (s *stream) isOpen() bool {
  110. if s.client != nil && s.client.isStopped() {
  111. s.client = nil
  112. }
  113. return s.client != nil
  114. }
  115. type WriteFlusher interface {
  116. io.Writer
  117. http.Flusher
  118. }
  119. // TODO: rename it to streamWriter.
  120. // TODO: replace fs with stream stats
  121. type streamServer struct {
  122. to types.ID
  123. term uint64
  124. fs *stats.FollowerStats
  125. q chan []raftpb.Entry
  126. done chan struct{}
  127. }
  128. // newStreamServer starts and returns a new started stream server.
  129. // The caller should call stop when finished, to shut it down.
  130. func newStreamServer(w WriteFlusher, to types.ID, term uint64) *streamServer {
  131. s := &streamServer{
  132. to: to,
  133. term: term,
  134. q: make(chan []raftpb.Entry, streamBufSize),
  135. done: make(chan struct{}),
  136. }
  137. go s.handle(w)
  138. return s
  139. }
  140. func (s *streamServer) send(ents []raftpb.Entry) error {
  141. select {
  142. case <-s.done:
  143. return fmt.Errorf("stopped")
  144. default:
  145. }
  146. select {
  147. case s.q <- ents:
  148. return nil
  149. default:
  150. log.Printf("rafthttp: maximum number of stream buffer entries to %d has been reached", s.to)
  151. return fmt.Errorf("maximum number of stream buffer entries has been reached")
  152. }
  153. }
  154. func (s *streamServer) handle(w WriteFlusher) {
  155. defer func() {
  156. close(s.done)
  157. log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
  158. }()
  159. ew := &entryWriter{w: w}
  160. for ents := range s.q {
  161. start := time.Now()
  162. if err := ew.writeEntries(ents); err != nil {
  163. log.Printf("rafthttp: encountered error writing to server log stream: %v", err)
  164. return
  165. }
  166. w.Flush()
  167. s.fs.Succ(time.Since(start))
  168. }
  169. }
  170. func (s *streamServer) stop() {
  171. close(s.q)
  172. <-s.done
  173. }
  174. func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
  175. // TODO: rename it to streamReader.
  176. // TODO: move the raft interface out of the reader.
  177. type streamClient struct {
  178. id types.ID
  179. to types.ID
  180. term uint64
  181. r Raft
  182. closer io.Closer
  183. done chan struct{}
  184. }
  185. // newStreamClient starts and returns a new started stream client.
  186. // The caller should call stop when finished, to shut it down.
  187. func newStreamClient(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) (*streamClient, error) {
  188. s := &streamClient{
  189. id: id,
  190. to: to,
  191. term: term,
  192. r: r,
  193. done: make(chan struct{}),
  194. }
  195. uu, err := url.Parse(u)
  196. if err != nil {
  197. return nil, fmt.Errorf("parse url %s error: %v", u, err)
  198. }
  199. uu.Path = path.Join(RaftStreamPrefix, s.id.String())
  200. req, err := http.NewRequest("GET", uu.String(), nil)
  201. if err != nil {
  202. return nil, fmt.Errorf("new request to %s error: %v", u, err)
  203. }
  204. req.Header.Set("X-Etcd-Cluster-ID", cid.String())
  205. req.Header.Set("X-Raft-To", s.to.String())
  206. req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
  207. resp, err := tr.RoundTrip(req)
  208. if err != nil {
  209. return nil, fmt.Errorf("error posting to %q: %v", u, err)
  210. }
  211. if resp.StatusCode != http.StatusOK {
  212. resp.Body.Close()
  213. return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
  214. }
  215. s.closer = resp.Body
  216. go s.handle(resp.Body)
  217. log.Printf("rafthttp: starting client stream to %s at term %d", s.to, s.term)
  218. return s, nil
  219. }
  220. func (s *streamClient) stop() {
  221. s.closer.Close()
  222. <-s.done
  223. }
  224. func (s *streamClient) isStopped() bool {
  225. select {
  226. case <-s.done:
  227. return true
  228. default:
  229. return false
  230. }
  231. }
  232. func (s *streamClient) handle(r io.Reader) {
  233. defer func() {
  234. close(s.done)
  235. log.Printf("rafthttp: client streaming to %s at term %d has been stopped", s.to, s.term)
  236. }()
  237. er := &entryReader{r: r}
  238. for {
  239. ents, err := er.readEntries()
  240. if err != nil {
  241. if err != io.EOF {
  242. log.Printf("rafthttp: encountered error reading the client log stream: %v", err)
  243. }
  244. return
  245. }
  246. // Considering Commit in MsgApp is not recovered, zero-entry appendEntry
  247. // messages have no use to raft state machine. Drop it here because
  248. // we don't have easy way to recover its Index easily.
  249. if len(ents) == 0 {
  250. continue
  251. }
  252. // The commit index field in appendEntry message is not recovered.
  253. // The follower updates its commit index through heartbeat.
  254. msg := raftpb.Message{
  255. Type: raftpb.MsgApp,
  256. From: uint64(s.to),
  257. To: uint64(s.id),
  258. Term: s.term,
  259. LogTerm: s.term,
  260. Index: ents[0].Index - 1,
  261. Entries: ents,
  262. }
  263. if err := s.r.Process(context.TODO(), msg); err != nil {
  264. log.Printf("rafthttp: process raft message error: %v", err)
  265. return
  266. }
  267. }
  268. }
  269. func shouldInitStream(m raftpb.Message) bool {
  270. return m.Type == raftpb.MsgAppResp && m.Reject == false
  271. }
  272. func canUseStream(m raftpb.Message) bool {
  273. return m.Type == raftpb.MsgApp && m.Index > 0 && m.Term == m.LogTerm
  274. }