cluster_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "fmt"
  17. "log"
  18. "math/rand"
  19. "os"
  20. "strconv"
  21. "testing"
  22. "github.com/coreos/etcd/client"
  23. "github.com/coreos/etcd/pkg/testutil"
  24. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  25. )
  26. func init() {
  27. // open microsecond-level time log for integration test debugging
  28. log.SetFlags(log.Ltime | log.Lmicroseconds | log.Lshortfile)
  29. if t := os.Getenv("ETCD_ELECTION_TIMEOUT_TICKS"); t != "" {
  30. if i, err := strconv.ParseInt(t, 10, 64); err == nil {
  31. electionTicks = int(i)
  32. }
  33. }
  34. }
  35. func TestClusterOf1(t *testing.T) { testCluster(t, 1) }
  36. func TestClusterOf3(t *testing.T) { testCluster(t, 3) }
  37. func testCluster(t *testing.T, size int) {
  38. defer testutil.AfterTest(t)
  39. c := NewCluster(t, size)
  40. c.Launch(t)
  41. defer c.Terminate(t)
  42. clusterMustProgress(t, c.Members)
  43. }
  44. func TestTLSClusterOf3(t *testing.T) {
  45. defer testutil.AfterTest(t)
  46. c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
  47. c.Launch(t)
  48. defer c.Terminate(t)
  49. clusterMustProgress(t, c.Members)
  50. }
  51. func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) }
  52. func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3) }
  53. func testClusterUsingDiscovery(t *testing.T, size int) {
  54. defer testutil.AfterTest(t)
  55. dc := NewCluster(t, 1)
  56. dc.Launch(t)
  57. defer dc.Terminate(t)
  58. // init discovery token space
  59. dcc := mustNewHTTPClient(t, dc.URLs(), nil)
  60. dkapi := client.NewKeysAPI(dcc)
  61. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  62. if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
  63. t.Fatal(err)
  64. }
  65. cancel()
  66. c := NewClusterByConfig(
  67. t,
  68. &ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"},
  69. )
  70. c.Launch(t)
  71. defer c.Terminate(t)
  72. clusterMustProgress(t, c.Members)
  73. }
  74. func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
  75. defer testutil.AfterTest(t)
  76. dc := NewCluster(t, 1)
  77. dc.Launch(t)
  78. defer dc.Terminate(t)
  79. // init discovery token space
  80. dcc := mustNewHTTPClient(t, dc.URLs(), nil)
  81. dkapi := client.NewKeysAPI(dcc)
  82. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  83. if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
  84. t.Fatal(err)
  85. }
  86. cancel()
  87. c := NewClusterByConfig(t,
  88. &ClusterConfig{
  89. Size: 3,
  90. PeerTLS: &testTLSInfo,
  91. DiscoveryURL: dc.URL(0) + "/v2/keys"},
  92. )
  93. c.Launch(t)
  94. defer c.Terminate(t)
  95. clusterMustProgress(t, c.Members)
  96. }
  97. func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) }
  98. func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) }
  99. func testDoubleClusterSize(t *testing.T, size int) {
  100. defer testutil.AfterTest(t)
  101. c := NewCluster(t, size)
  102. c.Launch(t)
  103. defer c.Terminate(t)
  104. for i := 0; i < size; i++ {
  105. c.AddMember(t)
  106. }
  107. clusterMustProgress(t, c.Members)
  108. }
  109. func TestDoubleTLSClusterSizeOf3(t *testing.T) {
  110. defer testutil.AfterTest(t)
  111. c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
  112. c.Launch(t)
  113. defer c.Terminate(t)
  114. for i := 0; i < 3; i++ {
  115. c.AddMember(t)
  116. }
  117. clusterMustProgress(t, c.Members)
  118. }
  119. func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) }
  120. func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) }
  121. func testDecreaseClusterSize(t *testing.T, size int) {
  122. defer testutil.AfterTest(t)
  123. c := NewCluster(t, size)
  124. c.Launch(t)
  125. defer c.Terminate(t)
  126. // TODO: remove the last but one member
  127. for i := 0; i < size-1; i++ {
  128. id := c.Members[len(c.Members)-1].s.ID()
  129. c.RemoveMember(t, uint64(id))
  130. c.waitLeader(t, c.Members)
  131. }
  132. clusterMustProgress(t, c.Members)
  133. }
  134. func TestForceNewCluster(t *testing.T) {
  135. c := NewCluster(t, 3)
  136. c.Launch(t)
  137. cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
  138. kapi := client.NewKeysAPI(cc)
  139. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  140. resp, err := kapi.Create(ctx, "/foo", "bar")
  141. if err != nil {
  142. t.Fatalf("unexpected create error: %v", err)
  143. }
  144. cancel()
  145. // ensure create has been applied in this machine
  146. ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
  147. if _, err = kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
  148. t.Fatalf("unexpected watch error: %v", err)
  149. }
  150. cancel()
  151. c.Members[0].Stop(t)
  152. c.Members[1].Terminate(t)
  153. c.Members[2].Terminate(t)
  154. c.Members[0].ForceNewCluster = true
  155. err = c.Members[0].Restart(t)
  156. if err != nil {
  157. t.Fatalf("unexpected ForceRestart error: %v", err)
  158. }
  159. defer c.Members[0].Terminate(t)
  160. c.waitLeader(t, c.Members[:1])
  161. // use new http client to init new connection
  162. cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
  163. kapi = client.NewKeysAPI(cc)
  164. // ensure force restart keep the old data, and new cluster can make progress
  165. ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
  166. if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
  167. t.Fatalf("unexpected watch error: %v", err)
  168. }
  169. cancel()
  170. clusterMustProgress(t, c.Members[:1])
  171. }
  172. func TestAddMemberAfterClusterFullRotation(t *testing.T) {
  173. defer testutil.AfterTest(t)
  174. c := NewCluster(t, 3)
  175. c.Launch(t)
  176. defer c.Terminate(t)
  177. // remove all the previous three members and add in three new members.
  178. for i := 0; i < 3; i++ {
  179. c.RemoveMember(t, uint64(c.Members[0].s.ID()))
  180. c.waitLeader(t, c.Members)
  181. c.AddMember(t)
  182. c.waitLeader(t, c.Members)
  183. }
  184. c.AddMember(t)
  185. c.waitLeader(t, c.Members)
  186. clusterMustProgress(t, c.Members)
  187. }
  188. // Ensure we can remove a member then add a new one back immediately.
  189. func TestIssue2681(t *testing.T) {
  190. defer testutil.AfterTest(t)
  191. c := NewCluster(t, 5)
  192. c.Launch(t)
  193. defer c.Terminate(t)
  194. c.RemoveMember(t, uint64(c.Members[4].s.ID()))
  195. c.waitLeader(t, c.Members)
  196. c.AddMember(t)
  197. c.waitLeader(t, c.Members)
  198. clusterMustProgress(t, c.Members)
  199. }
  200. // Ensure we can remove a member after a snapshot then add a new one back.
  201. func TestIssue2746(t *testing.T) { testIssue2746(t, 5) }
  202. // With 3 nodes TestIssue2476 sometimes had a shutdown with an inflight snapshot.
  203. func TestIssue2746WithThree(t *testing.T) { testIssue2746(t, 3) }
  204. func testIssue2746(t *testing.T, members int) {
  205. defer testutil.AfterTest(t)
  206. c := NewCluster(t, members)
  207. for _, m := range c.Members {
  208. m.SnapCount = 10
  209. }
  210. c.Launch(t)
  211. defer c.Terminate(t)
  212. // force a snapshot
  213. for i := 0; i < 20; i++ {
  214. clusterMustProgress(t, c.Members)
  215. }
  216. c.RemoveMember(t, uint64(c.Members[members-1].s.ID()))
  217. c.waitLeader(t, c.Members)
  218. c.AddMember(t)
  219. c.waitLeader(t, c.Members)
  220. clusterMustProgress(t, c.Members)
  221. }
  222. // Ensure etcd will not panic when removing a just started member.
  223. func TestIssue2904(t *testing.T) {
  224. defer testutil.AfterTest(t)
  225. // start 1-member cluster to ensure member 0 is the leader of the cluster.
  226. c := NewCluster(t, 1)
  227. c.Launch(t)
  228. defer c.Terminate(t)
  229. c.AddMember(t)
  230. c.Members[1].Stop(t)
  231. // send remove member-1 request to the cluster.
  232. cc := mustNewHTTPClient(t, c.URLs(), nil)
  233. ma := client.NewMembersAPI(cc)
  234. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  235. // the proposal is not committed because member 1 is stopped, but the
  236. // proposal is appended to leader's raft log.
  237. ma.Remove(ctx, c.Members[1].s.ID().String())
  238. cancel()
  239. // restart member, and expect it to send UpdateAttributes request.
  240. // the log in the leader is like this:
  241. // [..., remove 1, ..., update attr 1, ...]
  242. c.Members[1].Restart(t)
  243. // when the member comes back, it ack the proposal to remove itself,
  244. // and apply it.
  245. <-c.Members[1].s.StopNotify()
  246. // terminate removed member
  247. c.Members[1].Terminate(t)
  248. c.Members = c.Members[:1]
  249. // wait member to be removed.
  250. c.waitMembersMatch(t, c.HTTPMembers())
  251. }
  252. // TestIssue3699 tests minority failure during cluster configuration; it was
  253. // deadlocking.
  254. func TestIssue3699(t *testing.T) {
  255. // start a cluster of 3 nodes a, b, c
  256. defer testutil.AfterTest(t)
  257. c := NewCluster(t, 3)
  258. c.Launch(t)
  259. defer c.Terminate(t)
  260. // make node a unavailable
  261. c.Members[0].Stop(t)
  262. <-c.Members[0].s.StopNotify()
  263. // add node d
  264. c.AddMember(t)
  265. // electing node d as leader makes node a unable to participate
  266. leaderID := c.waitLeader(t, c.Members)
  267. for leaderID != 3 {
  268. c.Members[leaderID].Stop(t)
  269. <-c.Members[leaderID].s.StopNotify()
  270. c.Members[leaderID].Restart(t)
  271. leaderID = c.waitLeader(t, c.Members)
  272. }
  273. // bring back node a
  274. // node a will remain useless as long as d is the leader.
  275. err := c.Members[0].Restart(t)
  276. select {
  277. case <-c.Members[0].s.StopNotify():
  278. t.Fatalf("should not be stopped")
  279. default:
  280. }
  281. // must waitLeader so goroutines don't leak on terminate
  282. leaderID = c.waitLeader(t, c.Members)
  283. // try to participate in cluster
  284. cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
  285. kapi := client.NewKeysAPI(cc)
  286. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  287. _, err = kapi.Set(ctx, "/foo", "bar", nil)
  288. cancel()
  289. if err != nil {
  290. t.Fatalf("unexpected error on Set (%v)", err)
  291. }
  292. }
  293. // clusterMustProgress ensures that cluster can make progress. It creates
  294. // a random key first, and check the new key could be got from all client urls
  295. // of the cluster.
  296. func clusterMustProgress(t *testing.T, membs []*member) {
  297. cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil)
  298. kapi := client.NewKeysAPI(cc)
  299. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  300. key := fmt.Sprintf("foo%d", rand.Int())
  301. resp, err := kapi.Create(ctx, "/"+key, "bar")
  302. if err != nil {
  303. t.Fatalf("create on %s error: %v", membs[0].URL(), err)
  304. }
  305. cancel()
  306. for i, m := range membs {
  307. u := m.URL()
  308. mcc := mustNewHTTPClient(t, []string{u}, nil)
  309. mkapi := client.NewKeysAPI(mcc)
  310. mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
  311. if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
  312. t.Fatalf("#%d: watch on %s error: %v", i, u, err)
  313. }
  314. mcancel()
  315. }
  316. }