etcd_test.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package etcd
  2. import (
  3. "fmt"
  4. "net/http"
  5. "net/http/httptest"
  6. "net/url"
  7. "testing"
  8. "time"
  9. "github.com/coreos/etcd/config"
  10. )
  11. func TestMultipleNodes(t *testing.T) {
  12. tests := []int{1, 3, 5, 9, 11}
  13. for _, tt := range tests {
  14. es, hs := buildCluster(tt, false)
  15. waitCluster(t, es)
  16. for i := range es {
  17. es[len(es)-i-1].Stop()
  18. }
  19. for i := range hs {
  20. hs[len(hs)-i-1].Close()
  21. }
  22. }
  23. afterTest(t)
  24. }
  25. func TestMultipleTLSNodes(t *testing.T) {
  26. tests := []int{1, 3, 5}
  27. for _, tt := range tests {
  28. es, hs := buildCluster(tt, true)
  29. waitCluster(t, es)
  30. for i := range es {
  31. es[len(es)-i-1].Stop()
  32. }
  33. for i := range hs {
  34. hs[len(hs)-i-1].Close()
  35. }
  36. }
  37. afterTest(t)
  38. }
  39. func TestV2Redirect(t *testing.T) {
  40. es, hs := buildCluster(3, false)
  41. waitCluster(t, es)
  42. u := hs[1].URL
  43. ru := fmt.Sprintf("%s%s", hs[0].URL, "/v2/keys/foo")
  44. tc := NewTestClient()
  45. v := url.Values{}
  46. v.Set("value", "XXX")
  47. resp, _ := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v)
  48. if resp.StatusCode != http.StatusTemporaryRedirect {
  49. t.Errorf("status = %d, want %d", resp.StatusCode, http.StatusTemporaryRedirect)
  50. }
  51. location, err := resp.Location()
  52. if err != nil {
  53. t.Errorf("want err = %, want nil", err)
  54. }
  55. if location.String() != ru {
  56. t.Errorf("location = %v, want %v", location.String(), ru)
  57. }
  58. resp.Body.Close()
  59. for i := range es {
  60. es[len(es)-i-1].Stop()
  61. }
  62. for i := range hs {
  63. hs[len(hs)-i-1].Close()
  64. }
  65. afterTest(t)
  66. }
  67. func TestAdd(t *testing.T) {
  68. tests := []int{3, 4, 5, 6}
  69. for _, tt := range tests {
  70. es := make([]*Server, tt)
  71. hs := make([]*httptest.Server, tt)
  72. for i := 0; i < tt; i++ {
  73. c := config.New()
  74. if i > 0 {
  75. c.Peers = []string{hs[0].URL}
  76. }
  77. es[i], hs[i] = initTestServer(c, int64(i), false)
  78. }
  79. go es[0].Bootstrap()
  80. for i := 1; i < tt; i++ {
  81. id := int64(i)
  82. for {
  83. lead := es[0].node.Leader()
  84. if lead == -1 {
  85. time.Sleep(defaultElection * es[0].tickDuration)
  86. continue
  87. }
  88. err := es[lead].Add(id, es[id].raftPubAddr, es[id].pubAddr)
  89. if err == nil {
  90. break
  91. }
  92. switch err {
  93. case tmpErr:
  94. time.Sleep(defaultElection * es[0].tickDuration)
  95. case serverStopErr:
  96. t.Fatalf("#%d on %d: unexpected stop", i, lead)
  97. default:
  98. t.Fatal(err)
  99. }
  100. }
  101. go es[i].run()
  102. for j := 0; j <= i; j++ {
  103. p := fmt.Sprintf("%s/%d", v2machineKVPrefix, id)
  104. w, err := es[j].Watch(p, false, false, 1)
  105. if err != nil {
  106. t.Errorf("#%d on %d: %v", i, j, err)
  107. break
  108. }
  109. <-w.EventChan
  110. }
  111. }
  112. for i := range hs {
  113. es[len(hs)-i-1].Stop()
  114. }
  115. for i := range hs {
  116. hs[len(hs)-i-1].Close()
  117. }
  118. }
  119. afterTest(t)
  120. }
  121. func TestRemove(t *testing.T) {
  122. tests := []int{3, 4, 5, 6}
  123. for _, tt := range tests {
  124. es, hs := buildCluster(tt, false)
  125. waitCluster(t, es)
  126. // we don't remove the machine from 2-node cluster because it is
  127. // not 100 percent safe in our raft.
  128. // TODO(yichengq): improve it later.
  129. for i := 0; i < tt-2; i++ {
  130. id := int64(i)
  131. send := id
  132. for {
  133. send++
  134. if send > int64(tt-1) {
  135. send = id
  136. }
  137. lead := es[send].node.Leader()
  138. if lead == -1 {
  139. time.Sleep(defaultElection * 5 * time.Millisecond)
  140. continue
  141. }
  142. err := es[lead].Remove(id)
  143. if err == nil {
  144. break
  145. }
  146. switch err {
  147. case tmpErr:
  148. time.Sleep(defaultElection * 5 * time.Millisecond)
  149. case serverStopErr:
  150. if lead == id {
  151. break
  152. }
  153. default:
  154. t.Fatal(err)
  155. }
  156. }
  157. <-es[i].stop
  158. }
  159. for i := range es {
  160. es[len(hs)-i-1].Stop()
  161. }
  162. for i := range hs {
  163. hs[len(hs)-i-1].Close()
  164. }
  165. }
  166. afterTest(t)
  167. }
  168. func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
  169. bootstrapper := 0
  170. es := make([]*Server, number)
  171. hs := make([]*httptest.Server, number)
  172. var seed string
  173. for i := range es {
  174. c := config.New()
  175. c.Peers = []string{seed}
  176. es[i], hs[i] = initTestServer(c, int64(i), tls)
  177. if i == bootstrapper {
  178. seed = hs[i].URL
  179. go es[i].Bootstrap()
  180. } else {
  181. // wait for the previous configuration change to be committed
  182. // or this configuration request might be dropped
  183. w, err := es[0].Watch(v2machineKVPrefix, true, false, uint64(i))
  184. if err != nil {
  185. panic(err)
  186. }
  187. <-w.EventChan
  188. go es[i].Join()
  189. }
  190. }
  191. return es, hs
  192. }
  193. func initTestServer(c *config.Config, id int64, tls bool) (e *Server, h *httptest.Server) {
  194. e = New(c, id)
  195. e.SetTick(time.Millisecond * 5)
  196. m := http.NewServeMux()
  197. m.Handle("/", e)
  198. m.Handle("/raft", e.t)
  199. m.Handle("/raft/", e.t)
  200. if tls {
  201. h = httptest.NewTLSServer(m)
  202. } else {
  203. h = httptest.NewServer(m)
  204. }
  205. e.raftPubAddr = h.URL
  206. e.pubAddr = h.URL
  207. return
  208. }
  209. func waitCluster(t *testing.T, es []*Server) {
  210. n := len(es)
  211. for i, e := range es {
  212. var index uint64
  213. for k := 0; k < n; k++ {
  214. index++
  215. w, err := e.Watch(v2machineKVPrefix, true, false, index)
  216. if err != nil {
  217. panic(err)
  218. }
  219. v := <-w.EventChan
  220. // join command may appear several times due to retry
  221. // when timeout
  222. if k > 0 {
  223. pw := fmt.Sprintf("%s/%d", v2machineKVPrefix, k-1)
  224. if v.Node.Key == pw {
  225. continue
  226. }
  227. }
  228. ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, k)
  229. if v.Node.Key != ww {
  230. t.Errorf("#%d path = %v, want %v", i, v.Node.Key, ww)
  231. }
  232. }
  233. }
  234. }