pipeline_test.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rafthttp
  15. import (
  16. "errors"
  17. "io/ioutil"
  18. "net/http"
  19. "sync"
  20. "testing"
  21. "github.com/coreos/etcd/etcdserver/stats"
  22. "github.com/coreos/etcd/pkg/testutil"
  23. "github.com/coreos/etcd/pkg/types"
  24. "github.com/coreos/etcd/raft/raftpb"
  25. )
  26. // TestPipelineSend tests that pipeline could send data using roundtripper
  27. // and increase success count in stats.
  28. func TestPipelineSend(t *testing.T) {
  29. tr := &roundTripperRecorder{}
  30. fs := &stats.FollowerStats{}
  31. p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
  32. p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
  33. p.stop()
  34. if tr.Request() == nil {
  35. t.Errorf("sender fails to post the data")
  36. }
  37. fs.Lock()
  38. defer fs.Unlock()
  39. if fs.Counts.Success != 1 {
  40. t.Errorf("success = %d, want 1", fs.Counts.Success)
  41. }
  42. }
  43. func TestPipelineExceedMaximalServing(t *testing.T) {
  44. tr := newRoundTripperBlocker()
  45. fs := &stats.FollowerStats{}
  46. p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
  47. // keep the sender busy and make the buffer full
  48. // nothing can go out as we block the sender
  49. testutil.ForceGosched()
  50. for i := 0; i < connPerPipeline+pipelineBufSize; i++ {
  51. select {
  52. case p.msgc <- raftpb.Message{}:
  53. default:
  54. t.Errorf("failed to send out message")
  55. }
  56. // force the sender to grab data
  57. testutil.ForceGosched()
  58. }
  59. // try to send a data when we are sure the buffer is full
  60. select {
  61. case p.msgc <- raftpb.Message{}:
  62. t.Errorf("unexpected message sendout")
  63. default:
  64. }
  65. // unblock the senders and force them to send out the data
  66. tr.unblock()
  67. testutil.ForceGosched()
  68. // It could send new data after previous ones succeed
  69. select {
  70. case p.msgc <- raftpb.Message{}:
  71. default:
  72. t.Errorf("failed to send out message")
  73. }
  74. p.stop()
  75. }
  76. // TestPipelineSendFailed tests that when send func meets the post error,
  77. // it increases fail count in stats.
  78. func TestPipelineSendFailed(t *testing.T) {
  79. fs := &stats.FollowerStats{}
  80. p := newPipeline(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
  81. p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
  82. p.stop()
  83. fs.Lock()
  84. defer fs.Unlock()
  85. if fs.Counts.Fail != 1 {
  86. t.Errorf("fail = %d, want 1", fs.Counts.Fail)
  87. }
  88. }
  89. func TestPipelinePost(t *testing.T) {
  90. tr := &roundTripperRecorder{}
  91. p := newPipeline(tr, "http://10.0.0.1", types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
  92. if err := p.post([]byte("some data")); err != nil {
  93. t.Fatalf("unexpect post error: %v", err)
  94. }
  95. p.stop()
  96. if g := tr.Request().Method; g != "POST" {
  97. t.Errorf("method = %s, want %s", g, "POST")
  98. }
  99. if g := tr.Request().URL.String(); g != "http://10.0.0.1" {
  100. t.Errorf("url = %s, want %s", g, "http://10.0.0.1")
  101. }
  102. if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" {
  103. t.Errorf("content type = %s, want %s", g, "application/protobuf")
  104. }
  105. if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" {
  106. t.Errorf("cluster id = %s, want %s", g, "1")
  107. }
  108. b, err := ioutil.ReadAll(tr.Request().Body)
  109. if err != nil {
  110. t.Fatalf("unexpected ReadAll error: %v", err)
  111. }
  112. if string(b) != "some data" {
  113. t.Errorf("body = %s, want %s", b, "some data")
  114. }
  115. }
  116. func TestPipelinePostBad(t *testing.T) {
  117. tests := []struct {
  118. u string
  119. code int
  120. err error
  121. }{
  122. // bad url
  123. {":bad url", http.StatusNoContent, nil},
  124. // RoundTrip returns error
  125. {"http://10.0.0.1", 0, errors.New("blah")},
  126. // unexpected response status code
  127. {"http://10.0.0.1", http.StatusOK, nil},
  128. {"http://10.0.0.1", http.StatusCreated, nil},
  129. }
  130. for i, tt := range tests {
  131. p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error))
  132. err := p.post([]byte("some data"))
  133. p.stop()
  134. if err == nil {
  135. t.Errorf("#%d: err = nil, want not nil", i)
  136. }
  137. }
  138. }
  139. func TestPipelinePostErrorc(t *testing.T) {
  140. tests := []struct {
  141. u string
  142. code int
  143. err error
  144. }{
  145. {"http://10.0.0.1", http.StatusForbidden, nil},
  146. {"http://10.0.0.1", http.StatusPreconditionFailed, nil},
  147. }
  148. for i, tt := range tests {
  149. errorc := make(chan error, 1)
  150. p := newPipeline(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc)
  151. p.post([]byte("some data"))
  152. p.stop()
  153. select {
  154. case <-errorc:
  155. default:
  156. t.Fatalf("#%d: cannot receive from errorc", i)
  157. }
  158. }
  159. }
  160. type roundTripperBlocker struct {
  161. c chan struct{}
  162. }
  163. func newRoundTripperBlocker() *roundTripperBlocker {
  164. return &roundTripperBlocker{c: make(chan struct{})}
  165. }
  166. func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
  167. <-t.c
  168. return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
  169. }
  170. func (t *roundTripperBlocker) unblock() {
  171. close(t.c)
  172. }
  173. type respRoundTripper struct {
  174. code int
  175. err error
  176. }
  177. func newRespRoundTripper(code int, err error) *respRoundTripper {
  178. return &respRoundTripper{code: code, err: err}
  179. }
  180. func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  181. return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err
  182. }
  183. type roundTripperRecorder struct {
  184. req *http.Request
  185. sync.Mutex
  186. }
  187. func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) {
  188. t.Lock()
  189. defer t.Unlock()
  190. t.req = req
  191. return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
  192. }
  193. func (t *roundTripperRecorder) Request() *http.Request {
  194. t.Lock()
  195. defer t.Unlock()
  196. return t.req
  197. }
  198. type nopReadCloser struct{}
  199. func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil }
  200. func (n *nopReadCloser) Close() error { return nil }