sender_test.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "errors"
  16. "io/ioutil"
  17. "net/http"
  18. "sync"
  19. "testing"
  20. "time"
  21. "github.com/coreos/etcd/etcdserver/stats"
  22. "github.com/coreos/etcd/pkg/testutil"
  23. "github.com/coreos/etcd/pkg/types"
  24. )
  25. func TestSendHubInitSenders(t *testing.T) {
  26. membs := []*Member{
  27. newTestMember(1, []string{"http://a"}, "", nil),
  28. newTestMember(2, []string{"http://b"}, "", nil),
  29. newTestMember(3, []string{"http://c"}, "", nil),
  30. }
  31. cl := newTestCluster(membs)
  32. ls := stats.NewLeaderStats("")
  33. h := newSendHub(nil, cl, nil, ls)
  34. ids := cl.MemberIDs()
  35. if len(h.senders) != len(ids) {
  36. t.Errorf("len(ids) = %d, want %d", len(h.senders), len(ids))
  37. }
  38. for _, id := range ids {
  39. if _, ok := h.senders[id]; !ok {
  40. t.Errorf("senders[%s] is nil, want exists", id)
  41. }
  42. }
  43. }
  44. func TestSendHubAdd(t *testing.T) {
  45. cl := newTestCluster(nil)
  46. ls := stats.NewLeaderStats("")
  47. h := newSendHub(nil, cl, nil, ls)
  48. m := newTestMember(1, []string{"http://a"}, "", nil)
  49. h.Add(m)
  50. if _, ok := ls.Followers["1"]; !ok {
  51. t.Errorf("FollowerStats[1] is nil, want exists")
  52. }
  53. s, ok := h.senders[types.ID(1)]
  54. if !ok {
  55. t.Fatalf("senders[1] is nil, want exists")
  56. }
  57. if s.u != "http://a/raft" {
  58. t.Errorf("url = %s, want %s", s.u, "http://a/raft")
  59. }
  60. h.Add(m)
  61. ns := h.senders[types.ID(1)]
  62. if s != ns {
  63. t.Errorf("sender = %p, want %p", ns, s)
  64. }
  65. }
  66. func TestSendHubRemove(t *testing.T) {
  67. membs := []*Member{
  68. newTestMember(1, []string{"http://a"}, "", nil),
  69. }
  70. cl := newTestCluster(membs)
  71. ls := stats.NewLeaderStats("")
  72. h := newSendHub(nil, cl, nil, ls)
  73. h.Remove(types.ID(1))
  74. if _, ok := h.senders[types.ID(1)]; ok {
  75. t.Fatalf("senders[1] exists, want removed")
  76. }
  77. }
  78. func TestSendHubShouldStop(t *testing.T) {
  79. membs := []*Member{
  80. newTestMember(1, []string{"http://a"}, "", nil),
  81. }
  82. tr := newRespRoundTripper(http.StatusForbidden, nil)
  83. cl := newTestCluster(membs)
  84. ls := stats.NewLeaderStats("")
  85. h := newSendHub(tr, cl, nil, ls)
  86. shouldstop := h.ShouldStopNotify()
  87. select {
  88. case <-shouldstop:
  89. t.Fatalf("received unexpected shouldstop notification")
  90. case <-time.After(10 * time.Millisecond):
  91. }
  92. h.senders[1].send([]byte("somedata"))
  93. testutil.ForceGosched()
  94. select {
  95. case <-shouldstop:
  96. default:
  97. t.Fatalf("cannot receive stop notification")
  98. }
  99. }
  100. // TestSenderSend tests that send func could post data using roundtripper
  101. // and increase success count in stats.
  102. func TestSenderSend(t *testing.T) {
  103. tr := &roundTripperRecorder{}
  104. fs := &stats.FollowerStats{}
  105. s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
  106. if err := s.send([]byte("some data")); err != nil {
  107. t.Fatalf("unexpect send error: %v", err)
  108. }
  109. s.stop()
  110. if tr.Request() == nil {
  111. t.Errorf("sender fails to post the data")
  112. }
  113. fs.Lock()
  114. defer fs.Unlock()
  115. if fs.Counts.Success != 1 {
  116. t.Errorf("success = %d, want 1", fs.Counts.Success)
  117. }
  118. }
  119. func TestSenderExceedMaximalServing(t *testing.T) {
  120. tr := newRoundTripperBlocker()
  121. fs := &stats.FollowerStats{}
  122. s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
  123. // keep the sender busy and make the buffer full
  124. // nothing can go out as we block the sender
  125. for i := 0; i < connPerSender+senderBufSize; i++ {
  126. if err := s.send([]byte("some data")); err != nil {
  127. t.Errorf("send err = %v, want nil", err)
  128. }
  129. // force the sender to grab data
  130. testutil.ForceGosched()
  131. }
  132. // try to send a data when we are sure the buffer is full
  133. if err := s.send([]byte("some data")); err == nil {
  134. t.Errorf("unexpect send success")
  135. }
  136. // unblock the senders and force them to send out the data
  137. tr.unblock()
  138. testutil.ForceGosched()
  139. // It could send new data after previous ones succeed
  140. if err := s.send([]byte("some data")); err != nil {
  141. t.Errorf("send err = %v, want nil", err)
  142. }
  143. s.stop()
  144. }
  145. // TestSenderSendFailed tests that when send func meets the post error,
  146. // it increases fail count in stats.
  147. func TestSenderSendFailed(t *testing.T) {
  148. fs := &stats.FollowerStats{}
  149. s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
  150. if err := s.send([]byte("some data")); err != nil {
  151. t.Fatalf("unexpect send error: %v", err)
  152. }
  153. s.stop()
  154. fs.Lock()
  155. defer fs.Unlock()
  156. if fs.Counts.Fail != 1 {
  157. t.Errorf("fail = %d, want 1", fs.Counts.Fail)
  158. }
  159. }
  160. func TestSenderPost(t *testing.T) {
  161. tr := &roundTripperRecorder{}
  162. s := newSender(tr, "http://10.0.0.1", types.ID(1), nil, nil)
  163. if err := s.post([]byte("some data")); err != nil {
  164. t.Fatalf("unexpect post error: %v", err)
  165. }
  166. s.stop()
  167. if g := tr.Request().Method; g != "POST" {
  168. t.Errorf("method = %s, want %s", g, "POST")
  169. }
  170. if g := tr.Request().URL.String(); g != "http://10.0.0.1" {
  171. t.Errorf("url = %s, want %s", g, "http://10.0.0.1")
  172. }
  173. if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" {
  174. t.Errorf("content type = %s, want %s", g, "application/protobuf")
  175. }
  176. if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" {
  177. t.Errorf("cluster id = %s, want %s", g, "1")
  178. }
  179. b, err := ioutil.ReadAll(tr.Request().Body)
  180. if err != nil {
  181. t.Fatalf("unexpected ReadAll error: %v", err)
  182. }
  183. if string(b) != "some data" {
  184. t.Errorf("body = %s, want %s", b, "some data")
  185. }
  186. }
  187. func TestSenderPostBad(t *testing.T) {
  188. tests := []struct {
  189. u string
  190. code int
  191. err error
  192. }{
  193. // bad url
  194. {":bad url", http.StatusNoContent, nil},
  195. // RoundTrip returns error
  196. {"http://10.0.0.1", 0, errors.New("blah")},
  197. // unexpected response status code
  198. {"http://10.0.0.1", http.StatusOK, nil},
  199. {"http://10.0.0.1", http.StatusCreated, nil},
  200. }
  201. for i, tt := range tests {
  202. shouldstop := make(chan struct{})
  203. s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
  204. err := s.post([]byte("some data"))
  205. s.stop()
  206. if err == nil {
  207. t.Errorf("#%d: err = nil, want not nil", i)
  208. }
  209. }
  210. }
  211. func TestSenderPostShouldStop(t *testing.T) {
  212. tests := []struct {
  213. u string
  214. code int
  215. err error
  216. }{
  217. {"http://10.0.0.1", http.StatusForbidden, nil},
  218. {"http://10.0.0.1", http.StatusPreconditionFailed, nil},
  219. }
  220. for i, tt := range tests {
  221. shouldstop := make(chan struct{}, 1)
  222. s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop)
  223. s.post([]byte("some data"))
  224. s.stop()
  225. select {
  226. case <-shouldstop:
  227. default:
  228. t.Fatalf("#%d: cannot receive shouldstop notification", i)
  229. }
  230. }
  231. }
  232. type roundTripperBlocker struct {
  233. c chan struct{}
  234. }
  235. func newRoundTripperBlocker() *roundTripperBlocker {
  236. return &roundTripperBlocker{c: make(chan struct{})}
  237. }
  238. func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) {
  239. <-t.c
  240. return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
  241. }
  242. func (t *roundTripperBlocker) unblock() {
  243. close(t.c)
  244. }
  245. type respRoundTripper struct {
  246. code int
  247. err error
  248. }
  249. func newRespRoundTripper(code int, err error) *respRoundTripper {
  250. return &respRoundTripper{code: code, err: err}
  251. }
  252. func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  253. return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err
  254. }
  255. type roundTripperRecorder struct {
  256. req *http.Request
  257. sync.Mutex
  258. }
  259. func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) {
  260. t.Lock()
  261. defer t.Unlock()
  262. t.req = req
  263. return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
  264. }
  265. func (t *roundTripperRecorder) Request() *http.Request {
  266. t.Lock()
  267. defer t.Unlock()
  268. return t.req
  269. }
  270. type nopReadCloser struct{}
  271. func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil }
  272. func (n *nopReadCloser) Close() error { return nil }