etcd_functional_test.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "fmt"
  16. "math/rand"
  17. "net/http/httptest"
  18. "net/url"
  19. "reflect"
  20. "testing"
  21. "time"
  22. "github.com/coreos/etcd/conf"
  23. "github.com/coreos/etcd/store"
  24. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  25. )
  26. func TestKillLeader(t *testing.T) {
  27. defer afterTest(t)
  28. tests := []int{3, 5, 9}
  29. for i, tt := range tests {
  30. es, hs := buildCluster(tt, false)
  31. waitCluster(t, es)
  32. var totalTime time.Duration
  33. for j := 0; j < tt; j++ {
  34. lead, _ := waitLeader(es)
  35. es[lead].Stop()
  36. hs[lead].Close()
  37. time.Sleep(es[0].tickDuration * defaultElection * 2)
  38. start := time.Now()
  39. if g, _ := waitLeader(es); g == lead {
  40. t.Errorf("#%d.%d: lead = %d, want not %d", i, j, g, lead)
  41. }
  42. take := time.Now().Sub(start)
  43. totalTime += take
  44. avgTime := totalTime / (time.Duration)(i+1)
  45. fmt.Println("Total time:", totalTime, "; Avg time:", avgTime)
  46. c := newTestConfig()
  47. c.DataDir = es[lead].cfg.DataDir
  48. c.Addr = hs[lead].Listener.Addr().String()
  49. id := es[lead].id
  50. e, h := newUnstartedTestServer(c, id, false)
  51. err := startServer(t, e)
  52. if err != nil {
  53. t.Fatalf("#%d.%d: %v", i, j, err)
  54. }
  55. es[lead] = e
  56. hs[lead] = h
  57. }
  58. destoryCluster(t, es, hs)
  59. }
  60. }
  61. func TestKillRandom(t *testing.T) {
  62. defer afterTest(t)
  63. tests := []int{3, 5, 9}
  64. for _, tt := range tests {
  65. es, hs := buildCluster(tt, false)
  66. waitCluster(t, es)
  67. for j := 0; j < tt; j++ {
  68. waitLeader(es)
  69. toKill := make(map[int64]struct{})
  70. for len(toKill) != tt/2-1 {
  71. toKill[rand.Int63n(int64(tt))] = struct{}{}
  72. }
  73. for k := range toKill {
  74. es[k].Stop()
  75. hs[k].Close()
  76. }
  77. time.Sleep(es[0].tickDuration * defaultElection * 2)
  78. waitLeader(es)
  79. for k := range toKill {
  80. c := newTestConfig()
  81. c.DataDir = es[k].cfg.DataDir
  82. c.Addr = hs[k].Listener.Addr().String()
  83. id := es[k].id
  84. e, h := newUnstartedTestServer(c, id, false)
  85. err := startServer(t, e)
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. es[k] = e
  90. hs[k] = h
  91. }
  92. }
  93. destoryCluster(t, es, hs)
  94. }
  95. }
  96. func TestJoinThroughFollower(t *testing.T) {
  97. defer afterTest(t)
  98. tests := []int{3, 4, 5, 6}
  99. for _, tt := range tests {
  100. es := make([]*Server, tt)
  101. hs := make([]*httptest.Server, tt)
  102. for i := 0; i < tt; i++ {
  103. c := newTestConfig()
  104. if i > 0 {
  105. c.Peers = []string{hs[i-1].URL}
  106. }
  107. es[i], hs[i] = newUnstartedTestServer(c, int64(i), false)
  108. }
  109. go es[0].Run()
  110. for i := 1; i < tt; i++ {
  111. go es[i].Run()
  112. waitLeader(es[:i])
  113. }
  114. waitCluster(t, es)
  115. destoryCluster(t, es, hs)
  116. }
  117. }
  118. func TestClusterConfigReload(t *testing.T) {
  119. defer afterTest(t)
  120. tests := []int{3, 4, 5, 6}
  121. for i, tt := range tests {
  122. es, hs := buildCluster(tt, false)
  123. waitCluster(t, es)
  124. lead, _ := waitLeader(es)
  125. cc := conf.NewClusterConfig()
  126. cc.ActiveSize = 15
  127. cc.RemoveDelay = 60
  128. if err := es[lead].p.setClusterConfig(cc); err != nil {
  129. t.Fatalf("#%d: setClusterConfig err = %v", i, err)
  130. }
  131. for k := range es {
  132. es[k].Stop()
  133. hs[k].Close()
  134. }
  135. for k := range es {
  136. c := newTestConfig()
  137. c.DataDir = es[k].cfg.DataDir
  138. c.Addr = hs[k].Listener.Addr().String()
  139. id := es[k].id
  140. e, h := newUnstartedTestServer(c, id, false)
  141. err := startServer(t, e)
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. es[k] = e
  146. hs[k] = h
  147. }
  148. lead, _ = waitLeader(es)
  149. // wait for msgAppResp to commit all entries
  150. time.Sleep(2 * defaultHeartbeat * es[lead].tickDuration)
  151. if g := es[lead].p.clusterConfig(); !reflect.DeepEqual(g, cc) {
  152. t.Errorf("#%d: clusterConfig = %+v, want %+v", i, g, cc)
  153. }
  154. destoryCluster(t, es, hs)
  155. }
  156. }
  157. func TestMultiNodeKillOne(t *testing.T) {
  158. defer afterTest(t)
  159. tests := []int{5}
  160. for i, tt := range tests {
  161. es, hs := buildCluster(tt, false)
  162. waitCluster(t, es)
  163. stop := make(chan bool)
  164. go keepSetting(hs[0].URL, stop)
  165. for j := 0; j < 10; j++ {
  166. idx := rand.Int() % tt
  167. es[idx].Stop()
  168. hs[idx].Close()
  169. c := newTestConfig()
  170. c.DataDir = es[idx].cfg.DataDir
  171. c.Addr = hs[idx].Listener.Addr().String()
  172. id := es[idx].id
  173. e, h := newUnstartedTestServer(c, id, false)
  174. err := startServer(t, e)
  175. if err != nil {
  176. t.Fatalf("#%d.%d: %v", i, j, err)
  177. }
  178. es[idx] = e
  179. hs[idx] = h
  180. }
  181. stop <- true
  182. <-stop
  183. destoryCluster(t, es, hs)
  184. }
  185. }
  186. func TestMultiNodeKillAllAndRecovery(t *testing.T) {
  187. defer afterTest(t)
  188. tests := []int{5}
  189. for i, tt := range tests {
  190. es, hs := buildCluster(tt, false)
  191. waitCluster(t, es)
  192. waitLeader(es)
  193. c := etcd.NewClient([]string{hs[0].URL})
  194. for i := 0; i < 10; i++ {
  195. if _, err := c.Set("foo", "bar", 0); err != nil {
  196. panic(err)
  197. }
  198. }
  199. for k := range es {
  200. es[k].Stop()
  201. hs[k].Close()
  202. }
  203. for k := range es {
  204. c := newTestConfig()
  205. c.DataDir = es[k].cfg.DataDir
  206. c.Addr = hs[k].Listener.Addr().String()
  207. id := es[k].id
  208. e, h := newUnstartedTestServer(c, id, false)
  209. err := startServer(t, e)
  210. if err != nil {
  211. t.Fatalf("#%d.%d: %v", i, k, err)
  212. }
  213. es[k] = e
  214. hs[k] = h
  215. }
  216. waitLeader(es)
  217. res, err := c.Set("foo", "bar", 0)
  218. if err != nil {
  219. t.Fatalf("#%d: set err after recovery: %v", err)
  220. }
  221. if g := res.Node.ModifiedIndex; g != 16 {
  222. t.Errorf("#%d: modifiedIndex = %d, want %d", i, g, 16)
  223. }
  224. destoryCluster(t, es, hs)
  225. }
  226. }
  227. func BenchmarkEndToEndSet(b *testing.B) {
  228. es, hs := buildCluster(3, false)
  229. waitLeader(es)
  230. b.ResetTimer()
  231. for n := 0; n < b.N; n++ {
  232. _, err := es[0].p.Set("foo", false, "bar", store.Permanent)
  233. if err != nil {
  234. panic("unexpect error")
  235. }
  236. }
  237. b.StopTimer()
  238. destoryCluster(nil, es, hs)
  239. }
  240. // TestModeSwitch tests switch mode between standby and peer.
  241. func TestModeSwitch(t *testing.T) {
  242. t.Skip("not implemented")
  243. }
  244. // Sending set commands
  245. func keepSetting(urlStr string, stop chan bool) {
  246. tc := NewTestClient()
  247. i := 0
  248. value := url.Values(map[string][]string{"value": {"bar"}})
  249. for {
  250. resp, err := tc.PutForm(fmt.Sprintf("%s/v2/keys/foo_%v", urlStr, i), value)
  251. if err == nil {
  252. tc.ReadBody(resp)
  253. }
  254. select {
  255. case <-stop:
  256. stop <- true
  257. return
  258. default:
  259. }
  260. i++
  261. }
  262. }
  263. type leadterm struct {
  264. lead int64
  265. term int64
  266. }
  267. func waitActiveLeader(es []*Server) (lead, term int64) {
  268. for {
  269. if l, t := waitLeader(es); l >= 0 && es[l].mode.Get() == participantMode {
  270. return l, t
  271. }
  272. }
  273. }
  274. // waitLeader waits until all alive servers are checked to have the same leader.
  275. // WARNING: The lead returned is not guaranteed to be actual leader.
  276. func waitLeader(es []*Server) (lead, term int64) {
  277. for {
  278. ls := make([]leadterm, 0, len(es))
  279. for i := range es {
  280. switch es[i].mode.Get() {
  281. case participantMode:
  282. ls = append(ls, getLead(es[i]))
  283. case standbyMode:
  284. //TODO(xiangli) add standby support
  285. case stopMode:
  286. }
  287. }
  288. if isSameLead(ls) {
  289. return ls[0].lead, ls[0].term
  290. }
  291. time.Sleep(es[0].tickDuration * defaultElection)
  292. }
  293. }
  294. func getLead(s *Server) leadterm {
  295. return leadterm{s.p.node.Leader(), s.p.node.Term()}
  296. }
  297. func isSameLead(ls []leadterm) bool {
  298. m := make(map[leadterm]int)
  299. for i := range ls {
  300. m[ls[i]] = m[ls[i]] + 1
  301. }
  302. if len(m) == 1 {
  303. if ls[0].lead == -1 {
  304. return false
  305. }
  306. return true
  307. }
  308. // todo(xiangli): printout the current cluster status for debugging....
  309. return false
  310. }