etcd_functional_test.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "fmt"
  16. "math/rand"
  17. "net/http/httptest"
  18. "reflect"
  19. "testing"
  20. "time"
  21. "github.com/coreos/etcd/config"
  22. "github.com/coreos/etcd/store"
  23. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  24. )
  25. func TestKillLeader(t *testing.T) {
  26. tests := []int{3, 5, 9}
  27. for i, tt := range tests {
  28. es, hs := buildCluster(tt, false)
  29. waitCluster(t, es)
  30. var totalTime time.Duration
  31. for j := 0; j < tt; j++ {
  32. lead, _ := waitLeader(es)
  33. es[lead].Stop()
  34. hs[lead].Close()
  35. time.Sleep(es[0].tickDuration * defaultElection * 2)
  36. start := time.Now()
  37. if g, _ := waitLeader(es); g == lead {
  38. t.Errorf("#%d.%d: lead = %d, want not %d", i, j, g, lead)
  39. }
  40. take := time.Now().Sub(start)
  41. totalTime += take
  42. avgTime := totalTime / (time.Duration)(i+1)
  43. fmt.Println("Total time:", totalTime, "; Avg time:", avgTime)
  44. c := config.New()
  45. c.DataDir = es[lead].config.DataDir
  46. c.Addr = hs[lead].Listener.Addr().String()
  47. id := es[lead].id
  48. e, h, err := buildServer(t, c, id)
  49. if err != nil {
  50. t.Fatalf("#%d.%d: %v", i, j, err)
  51. }
  52. es[lead] = e
  53. hs[lead] = h
  54. }
  55. destoryCluster(t, es, hs)
  56. }
  57. afterTest(t)
  58. }
  59. func TestKillRandom(t *testing.T) {
  60. tests := []int{3, 5, 9}
  61. for _, tt := range tests {
  62. es, hs := buildCluster(tt, false)
  63. waitCluster(t, es)
  64. for j := 0; j < tt; j++ {
  65. waitLeader(es)
  66. toKill := make(map[int64]struct{})
  67. for len(toKill) != tt/2-1 {
  68. toKill[rand.Int63n(int64(tt))] = struct{}{}
  69. }
  70. for k := range toKill {
  71. es[k].Stop()
  72. hs[k].Close()
  73. }
  74. time.Sleep(es[0].tickDuration * defaultElection * 2)
  75. waitLeader(es)
  76. for k := range toKill {
  77. c := config.New()
  78. c.DataDir = es[k].config.DataDir
  79. c.Addr = hs[k].Listener.Addr().String()
  80. id := es[k].id
  81. e, h, err := buildServer(t, c, id)
  82. if err != nil {
  83. t.Fatal(err)
  84. }
  85. es[k] = e
  86. hs[k] = h
  87. }
  88. }
  89. destoryCluster(t, es, hs)
  90. }
  91. afterTest(t)
  92. }
  93. func TestJoinThroughFollower(t *testing.T) {
  94. tests := []int{3, 4, 5, 6}
  95. for _, tt := range tests {
  96. es := make([]*Server, tt)
  97. hs := make([]*httptest.Server, tt)
  98. for i := 0; i < tt; i++ {
  99. c := config.New()
  100. if i > 0 {
  101. c.Peers = []string{hs[i-1].URL}
  102. }
  103. es[i], hs[i] = initTestServer(c, int64(i), false)
  104. }
  105. go es[0].Run()
  106. for i := 1; i < tt; i++ {
  107. go es[i].Run()
  108. waitLeader(es[:i])
  109. }
  110. waitCluster(t, es)
  111. destoryCluster(t, es, hs)
  112. }
  113. afterTest(t)
  114. }
  115. func TestClusterConfigReload(t *testing.T) {
  116. tests := []int{3, 4, 5, 6}
  117. for i, tt := range tests {
  118. es, hs := buildCluster(tt, false)
  119. waitCluster(t, es)
  120. lead, _ := waitLeader(es)
  121. conf := config.NewClusterConfig()
  122. conf.ActiveSize = 15
  123. conf.RemoveDelay = 60
  124. if err := es[lead].p.setClusterConfig(conf); err != nil {
  125. t.Fatalf("#%d: setClusterConfig err = %v", i, err)
  126. }
  127. for k := range es {
  128. es[k].Stop()
  129. hs[k].Close()
  130. }
  131. for k := range es {
  132. c := config.New()
  133. c.DataDir = es[k].config.DataDir
  134. c.Addr = hs[k].Listener.Addr().String()
  135. id := es[k].id
  136. e, h, err := buildServer(t, c, id)
  137. if err != nil {
  138. t.Fatal(err)
  139. }
  140. es[k] = e
  141. hs[k] = h
  142. }
  143. lead, _ = waitLeader(es)
  144. // wait for msgAppResp to commit all entries
  145. time.Sleep(2 * defaultHeartbeat * es[lead].tickDuration)
  146. if g := es[lead].p.clusterConfig(); !reflect.DeepEqual(g, conf) {
  147. t.Errorf("#%d: clusterConfig = %+v, want %+v", i, g, conf)
  148. }
  149. destoryCluster(t, es, hs)
  150. }
  151. afterTest(t)
  152. }
  153. func TestMultiNodeKillOne(t *testing.T) {
  154. tests := []int{5}
  155. for i, tt := range tests {
  156. es, hs := buildCluster(tt, false)
  157. waitCluster(t, es)
  158. stop := make(chan bool)
  159. go keepSetting(hs[0].URL, stop)
  160. for j := 0; j < 10; j++ {
  161. idx := rand.Int() % tt
  162. es[idx].Stop()
  163. hs[idx].Close()
  164. c := config.New()
  165. c.DataDir = es[idx].config.DataDir
  166. c.Addr = hs[idx].Listener.Addr().String()
  167. id := es[idx].id
  168. e, h, err := buildServer(t, c, id)
  169. if err != nil {
  170. t.Fatalf("#%d.%d: %v", i, j, err)
  171. }
  172. es[idx] = e
  173. hs[idx] = h
  174. }
  175. stop <- true
  176. <-stop
  177. destoryCluster(t, es, hs)
  178. }
  179. afterTest(t)
  180. }
  181. func TestMultiNodeKillAllAndRecovery(t *testing.T) {
  182. tests := []int{5}
  183. for i, tt := range tests {
  184. es, hs := buildCluster(tt, false)
  185. waitCluster(t, es)
  186. waitLeader(es)
  187. c := etcd.NewClient([]string{hs[0].URL})
  188. for i := 0; i < 10; i++ {
  189. if _, err := c.Set("foo", "bar", 0); err != nil {
  190. panic(err)
  191. }
  192. }
  193. for k := range es {
  194. es[k].Stop()
  195. hs[k].Close()
  196. }
  197. for k := range es {
  198. c := config.New()
  199. c.DataDir = es[k].config.DataDir
  200. c.Addr = hs[k].Listener.Addr().String()
  201. id := es[k].id
  202. e, h, err := buildServer(t, c, id)
  203. if err != nil {
  204. t.Fatalf("#%d.%d: %v", i, k, err)
  205. }
  206. es[k] = e
  207. hs[k] = h
  208. }
  209. waitLeader(es)
  210. res, err := c.Set("foo", "bar", 0)
  211. if err != nil {
  212. t.Fatalf("#%d: set err after recovery: %v", err)
  213. }
  214. if g := res.Node.ModifiedIndex; g != 16 {
  215. t.Errorf("#%d: modifiedIndex = %d, want %d", i, g, 16)
  216. }
  217. destoryCluster(t, es, hs)
  218. }
  219. afterTest(t)
  220. }
  221. func BenchmarkEndToEndSet(b *testing.B) {
  222. es, hs := buildCluster(3, false)
  223. waitLeader(es)
  224. b.ResetTimer()
  225. for n := 0; n < b.N; n++ {
  226. _, err := es[0].p.Set("foo", false, "bar", store.Permanent)
  227. if err != nil {
  228. panic("unexpect error")
  229. }
  230. }
  231. b.StopTimer()
  232. destoryCluster(nil, es, hs)
  233. }
  234. // TODO(yichengq): cannot handle previous msgDenial correctly now
  235. func TestModeSwitch(t *testing.T) {
  236. t.Skip("not passed")
  237. size := 5
  238. round := 3
  239. for i := 0; i < size; i++ {
  240. es, hs := buildCluster(size, false)
  241. waitCluster(t, es)
  242. config := config.NewClusterConfig()
  243. config.SyncInterval = 0
  244. id := int64(i)
  245. for j := 0; j < round; j++ {
  246. lead, _ := waitActiveLeader(es)
  247. // cluster only demotes follower
  248. if lead == id {
  249. continue
  250. }
  251. config.ActiveSize = size - 1
  252. if err := es[lead].p.setClusterConfig(config); err != nil {
  253. t.Fatalf("#%d: setClusterConfig err = %v", i, err)
  254. }
  255. if err := es[lead].p.remove(id); err != nil {
  256. t.Fatalf("#%d: remove err = %v", i, err)
  257. }
  258. waitMode(standbyMode, es[i])
  259. for k := 0; k < 4; k++ {
  260. if es[i].s.leader != noneId {
  261. break
  262. }
  263. time.Sleep(20 * time.Millisecond)
  264. }
  265. if g := es[i].s.leader; g != lead {
  266. t.Errorf("#%d: lead = %d, want %d", i, g, lead)
  267. }
  268. config.ActiveSize = size
  269. if err := es[lead].p.setClusterConfig(config); err != nil {
  270. t.Fatalf("#%d: setClusterConfig err = %v", i, err)
  271. }
  272. waitMode(participantMode, es[i])
  273. if err := checkParticipant(i, es); err != nil {
  274. t.Errorf("#%d: check alive err = %v", i, err)
  275. }
  276. }
  277. destoryCluster(t, es, hs)
  278. }
  279. afterTest(t)
  280. }
  281. // Sending set commands
  282. func keepSetting(urlStr string, stop chan bool) {
  283. stopSet := false
  284. i := 0
  285. c := etcd.NewClient([]string{urlStr})
  286. for {
  287. key := fmt.Sprintf("%s_%v", "foo", i)
  288. result, err := c.Set(key, "bar", 0)
  289. if err != nil || result.Node.Key != "/"+key || result.Node.Value != "bar" {
  290. select {
  291. case <-stop:
  292. stopSet = true
  293. default:
  294. }
  295. }
  296. select {
  297. case <-stop:
  298. stopSet = true
  299. default:
  300. }
  301. if stopSet {
  302. break
  303. }
  304. i++
  305. }
  306. stop <- true
  307. }
  308. type leadterm struct {
  309. lead int64
  310. term int64
  311. }
  312. func waitActiveLeader(es []*Server) (lead, term int64) {
  313. for {
  314. if l, t := waitLeader(es); l >= 0 && es[l].mode.Get() == participantMode {
  315. return l, t
  316. }
  317. }
  318. }
  319. // waitLeader waits until all alive servers are checked to have the same leader.
  320. // WARNING: The lead returned is not guaranteed to be actual leader.
  321. func waitLeader(es []*Server) (lead, term int64) {
  322. for {
  323. ls := make([]leadterm, 0, len(es))
  324. for i := range es {
  325. switch es[i].mode.Get() {
  326. case participantMode:
  327. ls = append(ls, getLead(es[i]))
  328. case standbyMode:
  329. //TODO(xiangli) add standby support
  330. case stopMode:
  331. }
  332. }
  333. if isSameLead(ls) {
  334. return ls[0].lead, ls[0].term
  335. }
  336. time.Sleep(es[0].tickDuration * defaultElection)
  337. }
  338. }
  339. func getLead(s *Server) leadterm {
  340. return leadterm{s.p.node.Leader(), s.p.node.Term()}
  341. }
  342. func isSameLead(ls []leadterm) bool {
  343. m := make(map[leadterm]int)
  344. for i := range ls {
  345. m[ls[i]] = m[ls[i]] + 1
  346. }
  347. if len(m) == 1 {
  348. if ls[0].lead == -1 {
  349. return false
  350. }
  351. return true
  352. }
  353. // todo(xiangli): printout the current cluster status for debugging....
  354. return false
  355. }