stream.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package rafthttp
  2. import (
  3. "fmt"
  4. "io"
  5. "log"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "path"
  10. "strconv"
  11. "sync"
  12. "time"
  13. "github.com/coreos/etcd/etcdserver/stats"
  14. "github.com/coreos/etcd/pkg/types"
  15. "github.com/coreos/etcd/raft/raftpb"
  16. )
  17. type streamType string
  18. const (
  19. streamTypeMessage streamType = "message"
  20. streamTypeMsgApp streamType = "msgapp"
  21. streamBufSize = 4096
  22. )
  23. var (
  24. // linkHeartbeatMessage is a special message used as heartbeat message in
  25. // link layer. It never conflicts with messages from raft because raft
  26. // doesn't send out messages without From and To fields.
  27. linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
  28. )
  29. func isLinkHeartbeatMessage(m raftpb.Message) bool {
  30. return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
  31. }
  32. type outgoingConn struct {
  33. t streamType
  34. termStr string
  35. io.Writer
  36. http.Flusher
  37. io.Closer
  38. }
  39. // streamWriter is a long-running worker that writes messages into the
  40. // attached outgoingConn.
  41. type streamWriter struct {
  42. fs *stats.FollowerStats
  43. mu sync.Mutex // guard field working and closer
  44. closer io.Closer
  45. working bool
  46. msgc chan raftpb.Message
  47. connc chan *outgoingConn
  48. stopc chan struct{}
  49. done chan struct{}
  50. }
  51. func startStreamWriter(fs *stats.FollowerStats) *streamWriter {
  52. w := &streamWriter{
  53. fs: fs,
  54. msgc: make(chan raftpb.Message, streamBufSize),
  55. connc: make(chan *outgoingConn),
  56. stopc: make(chan struct{}),
  57. done: make(chan struct{}),
  58. }
  59. go w.run()
  60. return w
  61. }
  62. func (cw *streamWriter) run() {
  63. var msgc chan raftpb.Message
  64. var heartbeatc <-chan time.Time
  65. var t streamType
  66. var msgAppTerm uint64
  67. var enc encoder
  68. var flusher http.Flusher
  69. tickc := time.Tick(ConnReadTimeout / 3)
  70. for {
  71. select {
  72. case <-heartbeatc:
  73. if err := enc.encode(linkHeartbeatMessage); err != nil {
  74. log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
  75. cw.resetCloser()
  76. heartbeatc, msgc = nil, nil
  77. continue
  78. }
  79. flusher.Flush()
  80. case m := <-msgc:
  81. if t == streamTypeMsgApp && m.Term != msgAppTerm {
  82. // TODO: reasonable retry logic
  83. if m.Term > msgAppTerm {
  84. cw.resetCloser()
  85. heartbeatc, msgc = nil, nil
  86. }
  87. continue
  88. }
  89. if err := enc.encode(m); err != nil {
  90. log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
  91. cw.resetCloser()
  92. heartbeatc, msgc = nil, nil
  93. continue
  94. }
  95. flusher.Flush()
  96. case conn := <-cw.connc:
  97. cw.resetCloser()
  98. t = conn.t
  99. switch conn.t {
  100. case streamTypeMsgApp:
  101. var err error
  102. msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64)
  103. if err != nil {
  104. log.Panicf("rafthttp: unexpected parse term %s error: %v", conn.termStr, err)
  105. }
  106. enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs}
  107. case streamTypeMessage:
  108. enc = &messageEncoder{w: conn.Writer}
  109. default:
  110. log.Panicf("rafthttp: unhandled stream type %s", conn.t)
  111. }
  112. flusher = conn.Flusher
  113. cw.mu.Lock()
  114. cw.closer = conn.Closer
  115. cw.working = true
  116. cw.mu.Unlock()
  117. heartbeatc, msgc = tickc, cw.msgc
  118. case <-cw.stopc:
  119. cw.resetCloser()
  120. close(cw.done)
  121. return
  122. }
  123. }
  124. }
  125. func (cw *streamWriter) isWorking() bool {
  126. cw.mu.Lock()
  127. defer cw.mu.Unlock()
  128. return cw.working
  129. }
  130. func (cw *streamWriter) resetCloser() {
  131. cw.mu.Lock()
  132. defer cw.mu.Unlock()
  133. if cw.working {
  134. cw.closer.Close()
  135. }
  136. cw.working = false
  137. }
  138. func (cw *streamWriter) attach(conn *outgoingConn) bool {
  139. select {
  140. case cw.connc <- conn:
  141. return true
  142. case <-cw.done:
  143. return false
  144. }
  145. }
  146. func (cw *streamWriter) stop() {
  147. close(cw.stopc)
  148. <-cw.done
  149. }
  150. // streamReader is a long-running go-routine that dials to the remote stream
  151. // endponit and reads messages from the response body returned.
  152. type streamReader struct {
  153. tr http.RoundTripper
  154. u string
  155. t streamType
  156. from, to types.ID
  157. cid types.ID
  158. recvc chan<- raftpb.Message
  159. mu sync.Mutex
  160. msgAppTerm uint64
  161. req *http.Request
  162. closer io.Closer
  163. stopc chan struct{}
  164. done chan struct{}
  165. }
  166. func startStreamReader(tr http.RoundTripper, u string, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message) *streamReader {
  167. r := &streamReader{
  168. tr: tr,
  169. u: u,
  170. t: t,
  171. from: from,
  172. to: to,
  173. cid: cid,
  174. recvc: recvc,
  175. stopc: make(chan struct{}),
  176. done: make(chan struct{}),
  177. }
  178. go r.run()
  179. return r
  180. }
  181. func (cr *streamReader) run() {
  182. for {
  183. rc, err := cr.roundtrip()
  184. if err != nil {
  185. log.Printf("rafthttp: roundtripping error: %v", err)
  186. } else {
  187. err := cr.decodeLoop(rc)
  188. if err != io.EOF && !isClosedConnectionError(err) {
  189. log.Printf("rafthttp: failed to read message on stream %s due to %v", cr.t, err)
  190. }
  191. }
  192. select {
  193. // Wait 100ms to create a new stream, so it doesn't bring too much
  194. // overhead when retry.
  195. case <-time.After(100 * time.Millisecond):
  196. case <-cr.stopc:
  197. close(cr.done)
  198. return
  199. }
  200. }
  201. }
  202. func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
  203. var dec decoder
  204. cr.mu.Lock()
  205. switch cr.t {
  206. case streamTypeMsgApp:
  207. dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm}
  208. case streamTypeMessage:
  209. dec = &messageDecoder{r: rc}
  210. default:
  211. log.Panicf("rafthttp: unhandled stream type %s", cr.t)
  212. }
  213. cr.closer = rc
  214. cr.mu.Unlock()
  215. for {
  216. m, err := dec.decode()
  217. switch {
  218. case err != nil:
  219. cr.mu.Lock()
  220. cr.resetCloser()
  221. cr.mu.Unlock()
  222. return err
  223. case isLinkHeartbeatMessage(m):
  224. // do nothing for linkHeartbeatMessage
  225. default:
  226. select {
  227. case cr.recvc <- m:
  228. default:
  229. log.Printf("rafthttp: dropping %s from %x because receive buffer is blocked",
  230. m.Type, m.From)
  231. }
  232. }
  233. }
  234. }
  235. func (cr *streamReader) update(u string) {
  236. cr.mu.Lock()
  237. defer cr.mu.Unlock()
  238. cr.u = u
  239. cr.resetCloser()
  240. }
  241. func (cr *streamReader) updateMsgAppTerm(term uint64) {
  242. cr.mu.Lock()
  243. defer cr.mu.Unlock()
  244. if cr.msgAppTerm == term {
  245. return
  246. }
  247. cr.msgAppTerm = term
  248. cr.resetCloser()
  249. }
  250. // TODO: always cancel in-flight dial and decode
  251. func (cr *streamReader) stop() {
  252. close(cr.stopc)
  253. cr.mu.Lock()
  254. cr.cancelRequest()
  255. cr.resetCloser()
  256. cr.mu.Unlock()
  257. <-cr.done
  258. }
  259. func (cr *streamReader) isWorking() bool {
  260. cr.mu.Lock()
  261. defer cr.mu.Unlock()
  262. return cr.closer != nil
  263. }
  264. func (cr *streamReader) roundtrip() (io.ReadCloser, error) {
  265. cr.mu.Lock()
  266. u := cr.u
  267. term := cr.msgAppTerm
  268. cr.mu.Unlock()
  269. uu, err := url.Parse(u)
  270. if err != nil {
  271. return nil, fmt.Errorf("parse url %s error: %v", u, err)
  272. }
  273. uu.Path = path.Join(RaftStreamPrefix, string(cr.t), cr.from.String())
  274. req, err := http.NewRequest("GET", uu.String(), nil)
  275. if err != nil {
  276. return nil, fmt.Errorf("new request to %s error: %v", u, err)
  277. }
  278. req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
  279. req.Header.Set("X-Raft-To", cr.to.String())
  280. if cr.t == streamTypeMsgApp {
  281. req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
  282. }
  283. cr.mu.Lock()
  284. cr.req = req
  285. cr.mu.Unlock()
  286. resp, err := cr.tr.RoundTrip(req)
  287. if err != nil {
  288. return nil, fmt.Errorf("error roundtripping to %s: %v", req.URL, err)
  289. }
  290. if resp.StatusCode != http.StatusOK {
  291. resp.Body.Close()
  292. return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
  293. }
  294. return resp.Body, nil
  295. }
  296. func (cr *streamReader) cancelRequest() {
  297. if canceller, ok := cr.tr.(*http.Transport); ok {
  298. canceller.CancelRequest(cr.req)
  299. }
  300. }
  301. func (cr *streamReader) resetCloser() {
  302. if cr.closer != nil {
  303. cr.closer.Close()
  304. }
  305. cr.closer = nil
  306. }
  307. func canUseMsgAppStream(m raftpb.Message) bool {
  308. return m.Type == raftpb.MsgApp && m.Term == m.LogTerm
  309. }
  310. func isClosedConnectionError(err error) bool {
  311. operr, ok := err.(*net.OpError)
  312. return ok && operr.Err.Error() == "use of closed network connection"
  313. }