peer_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rafthttp
  14. import (
  15. "errors"
  16. "io/ioutil"
  17. "net/http"
  18. "sync"
  19. "testing"
  20. "github.com/coreos/etcd/etcdserver/stats"
  21. "github.com/coreos/etcd/pkg/testutil"
  22. "github.com/coreos/etcd/pkg/types"
  23. "github.com/coreos/etcd/raft/raftpb"
  24. )
  25. // TestSenderSend tests that send func could post data using roundtripper
  26. // and increase success count in stats.
  27. func TestSenderSend(t *testing.T) {
  28. tr := &roundTripperRecorder{}
  29. fs := &stats.FollowerStats{}
  30. p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
  31. if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
  32. t.Fatalf("unexpect send error: %v", err)
  33. }
  34. p.Stop()
  35. if tr.Request() == nil {
  36. t.Errorf("sender fails to post the data")
  37. }
  38. fs.Lock()
  39. defer fs.Unlock()
  40. if fs.Counts.Success != 1 {
  41. t.Errorf("success = %d, want 1", fs.Counts.Success)
  42. }
  43. }
  44. func TestSenderExceedMaximalServing(t *testing.T) {
  45. tr := newRoundTripperBlocker()
  46. fs := &stats.FollowerStats{}
  47. p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
  48. // keep the sender busy and make the buffer full
  49. // nothing can go out as we block the sender
  50. for i := 0; i < connPerSender+senderBufSize; i++ {
  51. if err := p.Send(raftpb.Message{}); err != nil {
  52. t.Errorf("send err = %v, want nil", err)
  53. }
  54. // force the sender to grab data
  55. testutil.ForceGosched()
  56. }
  57. // try to send a data when we are sure the buffer is full
  58. if err := p.Send(raftpb.Message{}); err == nil {
  59. t.Errorf("unexpect send success")
  60. }
  61. // unblock the senders and force them to send out the data
  62. tr.unblock()
  63. testutil.ForceGosched()
  64. // It could send new data after previous ones succeed
  65. if err := p.Send(raftpb.Message{}); err != nil {
  66. t.Errorf("send err = %v, want nil", err)
  67. }
  68. p.Stop()
  69. }
  70. // TestSenderSendFailed tests that when send func meets the post error,
  71. // it increases fail count in stats.
  72. func TestSenderSendFailed(t *testing.T) {
  73. fs := &stats.FollowerStats{}
  74. p := NewPeer(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
  75. if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
  76. t.Fatalf("unexpect Send error: %v", err)
  77. }
  78. p.Stop()
  79. fs.Lock()
  80. defer fs.Unlock()
  81. if fs.Counts.Fail != 1 {
  82. t.Errorf("fail = %d, want 1", fs.Counts.Fail)
  83. }
  84. }
  85. func TestSenderPost(t *testing.T) {
  86. tr := &roundTripperRecorder{}
  87. p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil)
  88. if err := p.post([]byte("some data")); err != nil {
  89. t.Fatalf("unexpect post error: %v", err)
  90. }
  91. p.Stop()
  92. if g := tr.Request().Method; g != "POST" {
  93. t.Errorf("method = %s, want %s", g, "POST")
  94. }
  95. if g := tr.Request().URL.String(); g != "http://10.0.0.1" {
  96. t.Errorf("url = %s, want %s", g, "http://10.0.0.1")
  97. }
  98. if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" {
  99. t.Errorf("content type = %s, want %s", g, "application/protobuf")
  100. }
  101. if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" {
  102. t.Errorf("cluster id = %s, want %s", g, "1")
  103. }
  104. b, err := ioutil.ReadAll(tr.Request().Body)
  105. if err != nil {
  106. t.Fatalf("unexpected ReadAll error: %v", err)
  107. }
  108. if string(b) != "some data" {
  109. t.Errorf("body = %s, want %s", b, "some data")
  110. }
  111. }
  112. func TestSenderPostBad(t *testing.T) {
  113. tests := []struct {
  114. u string
  115. code int
  116. err error
  117. }{
  118. // bad url
  119. {":bad url", http.StatusNoContent, nil},
  120. // RoundTrip returns error
  121. {"http://10.0.0.1", 0, errors.New("blah")},
  122. // unexpected response status code
  123. {"http://10.0.0.1", http.StatusOK, nil},
  124. {"http://10.0.0.1", http.StatusCreated, nil},
  125. }
  126. for i, tt := range tests {
  127. p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, make(chan error))
  128. err := p.post([]byte("some data"))
  129. p.Stop()
  130. if err == nil {
  131. t.Errorf("#%d: err = nil, want not nil", i)
  132. }
  133. }
  134. }
  135. func TestPeerPostErrorc(t *testing.T) {
  136. tests := []struct {
  137. u string
  138. code int
  139. err error
  140. }{
  141. {"http://10.0.0.1", http.StatusForbidden, nil},
  142. {"http://10.0.0.1", http.StatusPreconditionFailed, nil},
  143. }
  144. for i, tt := range tests {
  145. errorc := make(chan error, 1)
  146. p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, errorc)
  147. p.post([]byte("some data"))
  148. p.Stop()
  149. select {
  150. case <-errorc:
  151. default:
  152. t.Fatalf("#%d: cannot receive from errorc", i)
  153. }
  154. }
  155. }
  156. type roundTripperBlocker struct {
  157. c chan struct{}
  158. }
  159. func newRoundTripperBlocker() *roundTripperBlocker {
  160. return &roundTripperBlocker{c: make(chan struct{})}
  161. }
  162. func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
  163. <-t.c
  164. return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
  165. }
  166. func (t *roundTripperBlocker) unblock() {
  167. close(t.c)
  168. }
  169. type respRoundTripper struct {
  170. code int
  171. err error
  172. }
  173. func newRespRoundTripper(code int, err error) *respRoundTripper {
  174. return &respRoundTripper{code: code, err: err}
  175. }
  176. func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  177. return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err
  178. }
  179. type roundTripperRecorder struct {
  180. req *http.Request
  181. sync.Mutex
  182. }
  183. func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) {
  184. t.Lock()
  185. defer t.Unlock()
  186. t.req = req
  187. return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
  188. }
  189. func (t *roundTripperRecorder) Request() *http.Request {
  190. t.Lock()
  191. defer t.Unlock()
  192. return t.req
  193. }
  194. type nopReadCloser struct{}
  195. func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil }
  196. func (n *nopReadCloser) Close() error { return nil }