etcd_functional_test.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "math/rand"
  16. "net/http/httptest"
  17. "testing"
  18. "time"
  19. "github.com/coreos/etcd/config"
  20. "github.com/coreos/etcd/store"
  21. )
  22. func TestKillLeader(t *testing.T) {
  23. tests := []int{3, 5, 9, 11}
  24. for i, tt := range tests {
  25. es, hs := buildCluster(tt, false)
  26. waitCluster(t, es)
  27. waitLeader(es)
  28. lead := es[0].p.node.Leader()
  29. es[lead].Stop()
  30. time.Sleep(es[0].tickDuration * defaultElection * 2)
  31. waitLeader(es)
  32. if es[1].p.node.Leader() == 0 {
  33. t.Errorf("#%d: lead = %d, want not 0", i, es[1].p.node.Leader())
  34. }
  35. for i := range es {
  36. es[len(es)-i-1].Stop()
  37. }
  38. for i := range hs {
  39. hs[len(hs)-i-1].Close()
  40. }
  41. }
  42. afterTest(t)
  43. }
  44. func TestRandomKill(t *testing.T) {
  45. tests := []int{3, 5, 9, 11}
  46. for _, tt := range tests {
  47. es, hs := buildCluster(tt, false)
  48. waitCluster(t, es)
  49. waitLeader(es)
  50. toKill := make(map[int64]struct{})
  51. for len(toKill) != tt/2-1 {
  52. toKill[rand.Int63n(int64(tt))] = struct{}{}
  53. }
  54. for k := range toKill {
  55. es[k].Stop()
  56. }
  57. time.Sleep(es[0].tickDuration * defaultElection * 2)
  58. waitLeader(es)
  59. for i := range es {
  60. es[len(es)-i-1].Stop()
  61. }
  62. for i := range hs {
  63. hs[len(hs)-i-1].Close()
  64. }
  65. }
  66. afterTest(t)
  67. }
  68. func TestJoinThroughFollower(t *testing.T) {
  69. tests := []int{3, 4, 5, 6}
  70. for _, tt := range tests {
  71. es := make([]*Server, tt)
  72. hs := make([]*httptest.Server, tt)
  73. for i := 0; i < tt; i++ {
  74. c := config.New()
  75. if i > 0 {
  76. c.Peers = []string{hs[i-1].URL}
  77. }
  78. es[i], hs[i] = initTestServer(c, int64(i), false)
  79. }
  80. go es[0].Run()
  81. for i := 1; i < tt; i++ {
  82. go es[i].Run()
  83. waitLeader(es[:i])
  84. }
  85. waitCluster(t, es)
  86. for i := range hs {
  87. es[len(hs)-i-1].Stop()
  88. }
  89. for i := range hs {
  90. hs[len(hs)-i-1].Close()
  91. }
  92. }
  93. afterTest(t)
  94. }
  95. func BenchmarkEndToEndSet(b *testing.B) {
  96. es, hs := buildCluster(3, false)
  97. waitLeader(es)
  98. b.ResetTimer()
  99. for n := 0; n < b.N; n++ {
  100. _, err := es[0].p.Set("foo", false, "bar", store.Permanent)
  101. if err != nil {
  102. panic("unexpect error")
  103. }
  104. }
  105. b.StopTimer()
  106. for i := range hs {
  107. es[len(hs)-i-1].Stop()
  108. }
  109. for i := range hs {
  110. hs[len(hs)-i-1].Close()
  111. }
  112. }
  113. // TODO(yichengq): cannot handle previous msgDenial correctly now
  114. func TestModeSwitch(t *testing.T) {
  115. t.Skip("not passed")
  116. size := 5
  117. round := 3
  118. for i := 0; i < size; i++ {
  119. es, hs := buildCluster(size, false)
  120. waitCluster(t, es)
  121. config := config.NewClusterConfig()
  122. config.SyncInterval = 0
  123. id := int64(i)
  124. for j := 0; j < round; j++ {
  125. lead, _ := waitActiveLeader(es)
  126. // cluster only demotes follower
  127. if lead == id {
  128. continue
  129. }
  130. config.ActiveSize = size - 1
  131. if err := es[lead].p.setClusterConfig(config); err != nil {
  132. t.Fatalf("#%d: setClusterConfig err = %v", i, err)
  133. }
  134. if err := es[lead].p.remove(id); err != nil {
  135. t.Fatalf("#%d: remove err = %v", i, err)
  136. }
  137. waitMode(standbyMode, es[i])
  138. for k := 0; k < 4; k++ {
  139. if es[i].s.leader != noneId {
  140. break
  141. }
  142. time.Sleep(20 * time.Millisecond)
  143. }
  144. if g := es[i].s.leader; g != lead {
  145. t.Errorf("#%d: lead = %d, want %d", i, g, lead)
  146. }
  147. config.ActiveSize = size
  148. if err := es[lead].p.setClusterConfig(config); err != nil {
  149. t.Fatalf("#%d: setClusterConfig err = %v", i, err)
  150. }
  151. waitMode(participantMode, es[i])
  152. if err := checkParticipant(i, es); err != nil {
  153. t.Errorf("#%d: check alive err = %v", i, err)
  154. }
  155. }
  156. for i := range hs {
  157. es[len(hs)-i-1].Stop()
  158. }
  159. for i := range hs {
  160. hs[len(hs)-i-1].Close()
  161. }
  162. }
  163. afterTest(t)
  164. }
  165. type leadterm struct {
  166. lead int64
  167. term int64
  168. }
  169. func waitActiveLeader(es []*Server) (lead, term int64) {
  170. for {
  171. if l, t := waitLeader(es); l >= 0 && es[l].mode.Get() == participantMode {
  172. return l, t
  173. }
  174. }
  175. }
  176. // waitLeader waits until all alive servers are checked to have the same leader.
  177. // WARNING: The lead returned is not guaranteed to be actual leader.
  178. func waitLeader(es []*Server) (lead, term int64) {
  179. for {
  180. ls := make([]leadterm, 0, len(es))
  181. for i := range es {
  182. switch es[i].mode.Get() {
  183. case participantMode:
  184. ls = append(ls, getLead(es[i]))
  185. case standbyMode:
  186. //TODO(xiangli) add standby support
  187. case stopMode:
  188. }
  189. }
  190. if isSameLead(ls) {
  191. return ls[0].lead, ls[0].term
  192. }
  193. time.Sleep(es[0].tickDuration * defaultElection)
  194. }
  195. }
  196. func getLead(s *Server) leadterm {
  197. return leadterm{s.p.node.Leader(), s.p.node.Term()}
  198. }
  199. func isSameLead(ls []leadterm) bool {
  200. m := make(map[leadterm]int)
  201. for i := range ls {
  202. m[ls[i]] = m[ls[i]] + 1
  203. }
  204. if len(m) == 1 {
  205. if ls[0].lead == -1 {
  206. return false
  207. }
  208. return true
  209. }
  210. // todo(xiangli): printout the current cluster status for debugging....
  211. return false
  212. }