pipeline_test.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rafthttp
  15. import (
  16. "errors"
  17. "io"
  18. "io/ioutil"
  19. "net/http"
  20. "sync"
  21. "testing"
  22. "time"
  23. "github.com/coreos/etcd/etcdserver/stats"
  24. "github.com/coreos/etcd/pkg/testutil"
  25. "github.com/coreos/etcd/pkg/types"
  26. "github.com/coreos/etcd/raft/raftpb"
  27. "github.com/coreos/etcd/version"
  28. )
  29. // TestPipelineSend tests that pipeline could send data using roundtripper
  30. // and increase success count in stats.
  31. func TestPipelineSend(t *testing.T) {
  32. tr := &roundTripperRecorder{}
  33. picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
  34. fs := &stats.FollowerStats{}
  35. p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
  36. p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
  37. testutil.WaitSchedule()
  38. p.stop()
  39. if tr.Request() == nil {
  40. t.Errorf("sender fails to post the data")
  41. }
  42. fs.Lock()
  43. defer fs.Unlock()
  44. if fs.Counts.Success != 1 {
  45. t.Errorf("success = %d, want 1", fs.Counts.Success)
  46. }
  47. }
  48. func TestPipelineExceedMaximalServing(t *testing.T) {
  49. tr := newRoundTripperBlocker()
  50. picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
  51. fs := &stats.FollowerStats{}
  52. p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
  53. // keep the sender busy and make the buffer full
  54. // nothing can go out as we block the sender
  55. testutil.WaitSchedule()
  56. for i := 0; i < connPerPipeline+pipelineBufSize; i++ {
  57. select {
  58. case p.msgc <- raftpb.Message{}:
  59. default:
  60. t.Errorf("failed to send out message")
  61. }
  62. // force the sender to grab data
  63. testutil.WaitSchedule()
  64. }
  65. // try to send a data when we are sure the buffer is full
  66. select {
  67. case p.msgc <- raftpb.Message{}:
  68. t.Errorf("unexpected message sendout")
  69. default:
  70. }
  71. // unblock the senders and force them to send out the data
  72. tr.unblock()
  73. testutil.WaitSchedule()
  74. // It could send new data after previous ones succeed
  75. select {
  76. case p.msgc <- raftpb.Message{}:
  77. default:
  78. t.Errorf("failed to send out message")
  79. }
  80. p.stop()
  81. }
  82. // TestPipelineSendFailed tests that when send func meets the post error,
  83. // it increases fail count in stats.
  84. func TestPipelineSendFailed(t *testing.T) {
  85. picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
  86. fs := &stats.FollowerStats{}
  87. p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
  88. p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
  89. testutil.WaitSchedule()
  90. p.stop()
  91. fs.Lock()
  92. defer fs.Unlock()
  93. if fs.Counts.Fail != 1 {
  94. t.Errorf("fail = %d, want 1", fs.Counts.Fail)
  95. }
  96. }
  97. func TestPipelinePost(t *testing.T) {
  98. tr := &roundTripperRecorder{}
  99. picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
  100. p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
  101. if err := p.post([]byte("some data")); err != nil {
  102. t.Fatalf("unexpect post error: %v", err)
  103. }
  104. p.stop()
  105. if g := tr.Request().Method; g != "POST" {
  106. t.Errorf("method = %s, want %s", g, "POST")
  107. }
  108. if g := tr.Request().URL.String(); g != "http://localhost:2380/raft" {
  109. t.Errorf("url = %s, want %s", g, "http://localhost:2380/raft")
  110. }
  111. if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" {
  112. t.Errorf("content type = %s, want %s", g, "application/protobuf")
  113. }
  114. if g := tr.Request().Header.Get("X-Server-Version"); g != version.Version {
  115. t.Errorf("version = %s, want %s", g, version.Version)
  116. }
  117. if g := tr.Request().Header.Get("X-Min-Cluster-Version"); g != version.MinClusterVersion {
  118. t.Errorf("min version = %s, want %s", g, version.MinClusterVersion)
  119. }
  120. if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" {
  121. t.Errorf("cluster id = %s, want %s", g, "1")
  122. }
  123. b, err := ioutil.ReadAll(tr.Request().Body)
  124. if err != nil {
  125. t.Fatalf("unexpected ReadAll error: %v", err)
  126. }
  127. if string(b) != "some data" {
  128. t.Errorf("body = %s, want %s", b, "some data")
  129. }
  130. }
  131. func TestPipelinePostBad(t *testing.T) {
  132. tests := []struct {
  133. u string
  134. code int
  135. err error
  136. }{
  137. // RoundTrip returns error
  138. {"http://localhost:2380", 0, errors.New("blah")},
  139. // unexpected response status code
  140. {"http://localhost:2380", http.StatusOK, nil},
  141. {"http://localhost:2380", http.StatusCreated, nil},
  142. }
  143. for i, tt := range tests {
  144. picker := mustNewURLPicker(t, []string{tt.u})
  145. p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error))
  146. err := p.post([]byte("some data"))
  147. p.stop()
  148. if err == nil {
  149. t.Errorf("#%d: err = nil, want not nil", i)
  150. }
  151. }
  152. }
  153. func TestPipelinePostErrorc(t *testing.T) {
  154. tests := []struct {
  155. u string
  156. code int
  157. err error
  158. }{
  159. {"http://localhost:2380", http.StatusForbidden, nil},
  160. }
  161. for i, tt := range tests {
  162. picker := mustNewURLPicker(t, []string{tt.u})
  163. errorc := make(chan error, 1)
  164. p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc)
  165. p.post([]byte("some data"))
  166. p.stop()
  167. select {
  168. case <-errorc:
  169. default:
  170. t.Fatalf("#%d: cannot receive from errorc", i)
  171. }
  172. }
  173. }
  174. func TestStopBlockedPipeline(t *testing.T) {
  175. picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
  176. p := newPipeline(newRoundTripperBlocker(), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
  177. // send many messages that most of them will be blocked in buffer
  178. for i := 0; i < connPerPipeline*10; i++ {
  179. p.msgc <- raftpb.Message{}
  180. }
  181. done := make(chan struct{})
  182. go func() {
  183. p.stop()
  184. done <- struct{}{}
  185. }()
  186. select {
  187. case <-done:
  188. case <-time.After(time.Second):
  189. t.Fatalf("failed to stop pipeline in 1s")
  190. }
  191. }
  192. type roundTripperBlocker struct {
  193. c chan error
  194. mu sync.Mutex
  195. unblocked bool
  196. }
  197. func newRoundTripperBlocker() *roundTripperBlocker {
  198. return &roundTripperBlocker{c: make(chan error)}
  199. }
  200. func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
  201. err := <-t.c
  202. if err != nil {
  203. return nil, err
  204. }
  205. return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
  206. }
  207. func (t *roundTripperBlocker) unblock() {
  208. t.mu.Lock()
  209. t.unblocked = true
  210. t.mu.Unlock()
  211. close(t.c)
  212. }
  213. func (t *roundTripperBlocker) CancelRequest(req *http.Request) {
  214. t.mu.Lock()
  215. defer t.mu.Unlock()
  216. if t.unblocked {
  217. return
  218. }
  219. t.c <- errors.New("request canceled")
  220. }
  221. type respRoundTripper struct {
  222. code int
  223. header http.Header
  224. err error
  225. }
  226. func newRespRoundTripper(code int, err error) *respRoundTripper {
  227. return &respRoundTripper{code: code, err: err}
  228. }
  229. func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  230. return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
  231. }
  232. type roundTripperRecorder struct {
  233. req *http.Request
  234. sync.Mutex
  235. }
  236. func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) {
  237. t.Lock()
  238. defer t.Unlock()
  239. t.req = req
  240. return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
  241. }
  242. func (t *roundTripperRecorder) Request() *http.Request {
  243. t.Lock()
  244. defer t.Unlock()
  245. return t.req
  246. }
  247. type nopReadCloser struct{}
  248. func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, io.EOF }
  249. func (n *nopReadCloser) Close() error { return nil }