http.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rafthttp
  15. import (
  16. "errors"
  17. "io/ioutil"
  18. "net/http"
  19. "path"
  20. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  21. pioutil "github.com/coreos/etcd/pkg/ioutil"
  22. "github.com/coreos/etcd/pkg/types"
  23. "github.com/coreos/etcd/raft/raftpb"
  24. "github.com/coreos/etcd/version"
  25. )
  26. const (
  27. ConnReadLimitByte = 64 * 1024
  28. )
  29. var (
  30. RaftPrefix = "/raft"
  31. RaftStreamPrefix = path.Join(RaftPrefix, "stream")
  32. errIncompatibleVersion = errors.New("incompatible version")
  33. errClusterIDMismatch = errors.New("cluster ID mismatch")
  34. )
  35. func NewHandler(r Raft, cid types.ID) http.Handler {
  36. return &handler{
  37. r: r,
  38. cid: cid,
  39. }
  40. }
  41. type peerGetter interface {
  42. Get(id types.ID) Peer
  43. }
  44. func newStreamHandler(peerGetter peerGetter, r Raft, id, cid types.ID) http.Handler {
  45. return &streamHandler{
  46. peerGetter: peerGetter,
  47. r: r,
  48. id: id,
  49. cid: cid,
  50. }
  51. }
  52. type writerToResponse interface {
  53. WriteTo(w http.ResponseWriter)
  54. }
  55. type handler struct {
  56. r Raft
  57. cid types.ID
  58. }
  59. func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  60. if r.Method != "POST" {
  61. w.Header().Set("Allow", "POST")
  62. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  63. return
  64. }
  65. if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil {
  66. plog.Errorf("request received was ignored (%v)", err)
  67. http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
  68. return
  69. }
  70. wcid := h.cid.String()
  71. w.Header().Set("X-Etcd-Cluster-ID", wcid)
  72. gcid := r.Header.Get("X-Etcd-Cluster-ID")
  73. if gcid != wcid {
  74. plog.Errorf("request received was ignored (cluster ID mismatch got %s want %s)", gcid, wcid)
  75. http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
  76. return
  77. }
  78. // Limit the data size that could be read from the request body, which ensures that read from
  79. // connection will not time out accidentally due to possible block in underlying implementation.
  80. limitedr := pioutil.NewLimitedBufferReader(r.Body, ConnReadLimitByte)
  81. b, err := ioutil.ReadAll(limitedr)
  82. if err != nil {
  83. plog.Errorf("failed to read raft message (%v)", err)
  84. http.Error(w, "error reading raft message", http.StatusBadRequest)
  85. return
  86. }
  87. var m raftpb.Message
  88. if err := m.Unmarshal(b); err != nil {
  89. plog.Errorf("failed to unmarshal raft message (%v)", err)
  90. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  91. return
  92. }
  93. if err := h.r.Process(context.TODO(), m); err != nil {
  94. switch v := err.(type) {
  95. case writerToResponse:
  96. v.WriteTo(w)
  97. default:
  98. plog.Warningf("failed to process raft message (%v)", err)
  99. http.Error(w, "error processing raft message", http.StatusInternalServerError)
  100. }
  101. return
  102. }
  103. // Write StatusNoContet header after the message has been processed by
  104. // raft, which faciliates the client to report MsgSnap status.
  105. w.WriteHeader(http.StatusNoContent)
  106. }
  107. type streamHandler struct {
  108. peerGetter peerGetter
  109. r Raft
  110. id types.ID
  111. cid types.ID
  112. }
  113. func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  114. if r.Method != "GET" {
  115. w.Header().Set("Allow", "GET")
  116. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  117. return
  118. }
  119. w.Header().Set("X-Server-Version", version.Version)
  120. if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil {
  121. plog.Errorf("request received was ignored (%v)", err)
  122. http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
  123. return
  124. }
  125. wcid := h.cid.String()
  126. w.Header().Set("X-Etcd-Cluster-ID", wcid)
  127. if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != wcid {
  128. plog.Errorf("streaming request ignored (cluster ID mismatch got %s want %s)", gcid, wcid)
  129. http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
  130. return
  131. }
  132. var t streamType
  133. switch path.Dir(r.URL.Path) {
  134. // backward compatibility
  135. case RaftStreamPrefix:
  136. t = streamTypeMsgApp
  137. case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)):
  138. t = streamTypeMsgAppV2
  139. case path.Join(RaftStreamPrefix, string(streamTypeMessage)):
  140. t = streamTypeMessage
  141. default:
  142. plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
  143. http.Error(w, "invalid path", http.StatusNotFound)
  144. return
  145. }
  146. fromStr := path.Base(r.URL.Path)
  147. from, err := types.IDFromString(fromStr)
  148. if err != nil {
  149. plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
  150. http.Error(w, "invalid from", http.StatusNotFound)
  151. return
  152. }
  153. if h.r.IsIDRemoved(uint64(from)) {
  154. plog.Warningf("rejected the stream from peer %s since it was removed", from)
  155. http.Error(w, "removed member", http.StatusGone)
  156. return
  157. }
  158. p := h.peerGetter.Get(from)
  159. if p == nil {
  160. // This may happen in following cases:
  161. // 1. user starts a remote peer that belongs to a different cluster
  162. // with the same cluster ID.
  163. // 2. local etcd falls behind of the cluster, and cannot recognize
  164. // the members that joined after its current progress.
  165. plog.Errorf("failed to find member %s in cluster %s", from, wcid)
  166. http.Error(w, "error sender not found", http.StatusNotFound)
  167. return
  168. }
  169. wto := h.id.String()
  170. if gto := r.Header.Get("X-Raft-To"); gto != wto {
  171. plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
  172. http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
  173. return
  174. }
  175. w.WriteHeader(http.StatusOK)
  176. w.(http.Flusher).Flush()
  177. c := newCloseNotifier()
  178. conn := &outgoingConn{
  179. t: t,
  180. termStr: r.Header.Get("X-Raft-Term"),
  181. Writer: w,
  182. Flusher: w.(http.Flusher),
  183. Closer: c,
  184. }
  185. p.attachOutgoingConn(conn)
  186. <-c.closeNotify()
  187. }
  188. type closeNotifier struct {
  189. done chan struct{}
  190. }
  191. func newCloseNotifier() *closeNotifier {
  192. return &closeNotifier{
  193. done: make(chan struct{}),
  194. }
  195. }
  196. func (n *closeNotifier) Close() error {
  197. close(n.done)
  198. return nil
  199. }
  200. func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done }