http_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rafthttp
  15. import (
  16. "bytes"
  17. "errors"
  18. "io"
  19. "net/http"
  20. "net/http/httptest"
  21. "strings"
  22. "testing"
  23. "time"
  24. "github.com/coreos/etcd/pkg/pbutil"
  25. "github.com/coreos/etcd/pkg/types"
  26. "github.com/coreos/etcd/raft/raftpb"
  27. )
  28. func TestServeRaftPrefix(t *testing.T) {
  29. testCases := []struct {
  30. method string
  31. body io.Reader
  32. p Raft
  33. clusterID string
  34. wcode int
  35. }{
  36. {
  37. // bad method
  38. "GET",
  39. bytes.NewReader(
  40. pbutil.MustMarshal(&raftpb.Message{}),
  41. ),
  42. &fakeRaft{},
  43. "0",
  44. http.StatusMethodNotAllowed,
  45. },
  46. {
  47. // bad method
  48. "PUT",
  49. bytes.NewReader(
  50. pbutil.MustMarshal(&raftpb.Message{}),
  51. ),
  52. &fakeRaft{},
  53. "0",
  54. http.StatusMethodNotAllowed,
  55. },
  56. {
  57. // bad method
  58. "DELETE",
  59. bytes.NewReader(
  60. pbutil.MustMarshal(&raftpb.Message{}),
  61. ),
  62. &fakeRaft{},
  63. "0",
  64. http.StatusMethodNotAllowed,
  65. },
  66. {
  67. // bad request body
  68. "POST",
  69. &errReader{},
  70. &fakeRaft{},
  71. "0",
  72. http.StatusBadRequest,
  73. },
  74. {
  75. // bad request protobuf
  76. "POST",
  77. strings.NewReader("malformed garbage"),
  78. &fakeRaft{},
  79. "0",
  80. http.StatusBadRequest,
  81. },
  82. {
  83. // good request, wrong cluster ID
  84. "POST",
  85. bytes.NewReader(
  86. pbutil.MustMarshal(&raftpb.Message{}),
  87. ),
  88. &fakeRaft{},
  89. "1",
  90. http.StatusPreconditionFailed,
  91. },
  92. {
  93. // good request, Processor failure
  94. "POST",
  95. bytes.NewReader(
  96. pbutil.MustMarshal(&raftpb.Message{}),
  97. ),
  98. &fakeRaft{
  99. err: &resWriterToError{code: http.StatusForbidden},
  100. },
  101. "0",
  102. http.StatusForbidden,
  103. },
  104. {
  105. // good request, Processor failure
  106. "POST",
  107. bytes.NewReader(
  108. pbutil.MustMarshal(&raftpb.Message{}),
  109. ),
  110. &fakeRaft{
  111. err: &resWriterToError{code: http.StatusInternalServerError},
  112. },
  113. "0",
  114. http.StatusInternalServerError,
  115. },
  116. {
  117. // good request, Processor failure
  118. "POST",
  119. bytes.NewReader(
  120. pbutil.MustMarshal(&raftpb.Message{}),
  121. ),
  122. &fakeRaft{err: errors.New("blah")},
  123. "0",
  124. http.StatusInternalServerError,
  125. },
  126. {
  127. // good request
  128. "POST",
  129. bytes.NewReader(
  130. pbutil.MustMarshal(&raftpb.Message{}),
  131. ),
  132. &fakeRaft{},
  133. "0",
  134. http.StatusNoContent,
  135. },
  136. }
  137. for i, tt := range testCases {
  138. req, err := http.NewRequest(tt.method, "foo", tt.body)
  139. if err != nil {
  140. t.Fatalf("#%d: could not create request: %#v", i, err)
  141. }
  142. req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
  143. rw := httptest.NewRecorder()
  144. h := NewHandler(tt.p, types.ID(0))
  145. h.ServeHTTP(rw, req)
  146. if rw.Code != tt.wcode {
  147. t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
  148. }
  149. }
  150. }
  151. func TestServeRaftStreamPrefix(t *testing.T) {
  152. tests := []struct {
  153. path string
  154. wtype streamType
  155. }{
  156. {
  157. RaftStreamPrefix + "/message/1",
  158. streamTypeMessage,
  159. },
  160. {
  161. RaftStreamPrefix + "/msgapp/1",
  162. streamTypeMsgAppV2,
  163. },
  164. // backward compatibility
  165. {
  166. RaftStreamPrefix + "/1",
  167. streamTypeMsgApp,
  168. },
  169. }
  170. for i, tt := range tests {
  171. req, err := http.NewRequest("GET", "http://localhost:7001"+tt.path, nil)
  172. if err != nil {
  173. t.Fatalf("#%d: could not create request: %#v", i, err)
  174. }
  175. req.Header.Set("X-Etcd-Cluster-ID", "1")
  176. req.Header.Set("X-Raft-To", "2")
  177. wterm := "1"
  178. req.Header.Set("X-Raft-Term", wterm)
  179. peer := newFakePeer()
  180. peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}}
  181. h := newStreamHandler(peerGetter, types.ID(2), types.ID(1))
  182. rw := httptest.NewRecorder()
  183. go h.ServeHTTP(rw, req)
  184. var conn *outgoingConn
  185. select {
  186. case conn = <-peer.connc:
  187. case <-time.After(time.Second):
  188. t.Fatalf("#%d: failed to attach outgoingConn", i)
  189. }
  190. if conn.t != tt.wtype {
  191. t.Errorf("$%d: type = %s, want %s", i, conn.t, tt.wtype)
  192. }
  193. if conn.termStr != wterm {
  194. t.Errorf("$%d: term = %s, want %s", i, conn.termStr, wterm)
  195. }
  196. conn.Close()
  197. }
  198. }
  199. func TestServeRaftStreamPrefixBad(t *testing.T) {
  200. tests := []struct {
  201. method string
  202. path string
  203. clusterID string
  204. remote string
  205. wcode int
  206. }{
  207. // bad method
  208. {
  209. "PUT",
  210. RaftStreamPrefix + "/message/1",
  211. "1",
  212. "1",
  213. http.StatusMethodNotAllowed,
  214. },
  215. // bad method
  216. {
  217. "POST",
  218. RaftStreamPrefix + "/message/1",
  219. "1",
  220. "1",
  221. http.StatusMethodNotAllowed,
  222. },
  223. // bad method
  224. {
  225. "DELETE",
  226. RaftStreamPrefix + "/message/1",
  227. "1",
  228. "1",
  229. http.StatusMethodNotAllowed,
  230. },
  231. // bad path
  232. {
  233. "GET",
  234. RaftStreamPrefix + "/strange/1",
  235. "1",
  236. "1",
  237. http.StatusNotFound,
  238. },
  239. // bad path
  240. {
  241. "GET",
  242. RaftStreamPrefix + "/strange",
  243. "1",
  244. "1",
  245. http.StatusNotFound,
  246. },
  247. // non-existant peer
  248. {
  249. "GET",
  250. RaftStreamPrefix + "/message/2",
  251. "1",
  252. "1",
  253. http.StatusNotFound,
  254. },
  255. // wrong cluster ID
  256. {
  257. "GET",
  258. RaftStreamPrefix + "/message/1",
  259. "2",
  260. "1",
  261. http.StatusPreconditionFailed,
  262. },
  263. // wrong remote id
  264. {
  265. "GET",
  266. RaftStreamPrefix + "/message/1",
  267. "1",
  268. "2",
  269. http.StatusPreconditionFailed,
  270. },
  271. }
  272. for i, tt := range tests {
  273. req, err := http.NewRequest(tt.method, "http://localhost:7001"+tt.path, nil)
  274. if err != nil {
  275. t.Fatalf("#%d: could not create request: %#v", i, err)
  276. }
  277. req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
  278. req.Header.Set("X-Raft-To", tt.remote)
  279. rw := httptest.NewRecorder()
  280. peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}}
  281. h := newStreamHandler(peerGetter, types.ID(1), types.ID(1))
  282. h.ServeHTTP(rw, req)
  283. if rw.Code != tt.wcode {
  284. t.Errorf("#%d: code = %d, want %d", i, rw.Code, tt.wcode)
  285. }
  286. }
  287. }
  288. func TestCloseNotifier(t *testing.T) {
  289. c := newCloseNotifier()
  290. select {
  291. case <-c.closeNotify():
  292. t.Fatalf("received unexpected close notification")
  293. default:
  294. }
  295. c.Close()
  296. select {
  297. case <-c.closeNotify():
  298. default:
  299. t.Fatalf("failed to get close notification")
  300. }
  301. }
  302. // errReader implements io.Reader to facilitate a broken request.
  303. type errReader struct{}
  304. func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
  305. type resWriterToError struct {
  306. code int
  307. }
  308. func (e *resWriterToError) Error() string { return "" }
  309. func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) }
  310. type fakePeerGetter struct {
  311. peers map[types.ID]Peer
  312. }
  313. func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
  314. type fakePeer struct {
  315. msgs []raftpb.Message
  316. urls types.URLs
  317. connc chan *outgoingConn
  318. }
  319. func newFakePeer() *fakePeer {
  320. return &fakePeer{
  321. connc: make(chan *outgoingConn, 1),
  322. }
  323. }
  324. func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) }
  325. func (pr *fakePeer) Update(urls types.URLs) { pr.urls = urls }
  326. func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
  327. func (pr *fakePeer) Stop() {}