remove_node_test.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package test
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "net/http"
  7. "os"
  8. "syscall"
  9. "testing"
  10. "time"
  11. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  12. "github.com/coreos/etcd/tests"
  13. "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
  14. )
  15. // remove the node and node rejoin with previous log
  16. func TestRemoveNode(t *testing.T) {
  17. procAttr := new(os.ProcAttr)
  18. procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
  19. clusterSize := 4
  20. argGroup, etcds, _ := CreateCluster(clusterSize, procAttr, false)
  21. defer DestroyCluster(etcds)
  22. time.Sleep(time.Second)
  23. c := etcd.NewClient(nil)
  24. c.SyncCluster()
  25. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "syncInterval":5}`))
  26. if !assert.Equal(t, resp.StatusCode, 200) {
  27. t.FailNow()
  28. }
  29. rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil)
  30. client := &http.Client{}
  31. for i := 0; i < 2; i++ {
  32. for i := 0; i < 2; i++ {
  33. client.Do(rmReq)
  34. fmt.Println("send remove to node3 and wait for its exiting")
  35. time.Sleep(100 * time.Millisecond)
  36. resp, err := c.Get("_etcd/machines", false, false)
  37. if err != nil {
  38. panic(err)
  39. }
  40. if len(resp.Node.Nodes) != 3 {
  41. t.Fatal("cannot remove peer")
  42. }
  43. etcds[2].Kill()
  44. etcds[2].Wait()
  45. if i == 1 {
  46. // rejoin with log
  47. etcds[2], err = os.StartProcess(EtcdBinPath, argGroup[2], procAttr)
  48. } else {
  49. // rejoin without log
  50. etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr)
  51. }
  52. if err != nil {
  53. panic(err)
  54. }
  55. time.Sleep(time.Second + 5*time.Second)
  56. resp, err = c.Get("_etcd/machines", false, false)
  57. if err != nil {
  58. panic(err)
  59. }
  60. if len(resp.Node.Nodes) != 4 {
  61. t.Fatalf("add peer fails #1 (%d != 4)", len(resp.Node.Nodes))
  62. }
  63. }
  64. // first kill the node, then remove it, then add it back
  65. for i := 0; i < 2; i++ {
  66. etcds[2].Kill()
  67. fmt.Println("kill node3 and wait for its exiting")
  68. etcds[2].Wait()
  69. client.Do(rmReq)
  70. time.Sleep(100 * time.Millisecond)
  71. resp, err := c.Get("_etcd/machines", false, false)
  72. if err != nil {
  73. panic(err)
  74. }
  75. if len(resp.Node.Nodes) != 3 {
  76. t.Fatal("cannot remove peer")
  77. }
  78. if i == 1 {
  79. // rejoin with log
  80. etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2]), procAttr)
  81. } else {
  82. // rejoin without log
  83. etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr)
  84. }
  85. if err != nil {
  86. panic(err)
  87. }
  88. time.Sleep(time.Second + time.Second)
  89. resp, err = c.Get("_etcd/machines", false, false)
  90. if err != nil {
  91. panic(err)
  92. }
  93. if len(resp.Node.Nodes) != 4 {
  94. t.Fatalf("add peer fails #2 (%d != 4)", len(resp.Node.Nodes))
  95. }
  96. }
  97. }
  98. }
  99. func TestRemovePausedNode(t *testing.T) {
  100. procAttr := new(os.ProcAttr)
  101. procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
  102. clusterSize := 4
  103. _, etcds, _ := CreateCluster(clusterSize, procAttr, false)
  104. defer DestroyCluster(etcds)
  105. time.Sleep(time.Second)
  106. c := etcd.NewClient(nil)
  107. c.SyncCluster()
  108. r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":1, "syncInterval":1}`))
  109. if !assert.Equal(t, r.StatusCode, 200) {
  110. t.FailNow()
  111. }
  112. // Wait for standby instances to update its cluster config
  113. time.Sleep(6 * time.Second)
  114. resp, err := c.Get("_etcd/machines", false, false)
  115. if err != nil {
  116. panic(err)
  117. }
  118. if len(resp.Node.Nodes) != 3 {
  119. t.Fatal("cannot remove peer")
  120. }
  121. for i := 0; i < clusterSize; i++ {
  122. // first pause the node, then remove it, then resume it
  123. idx := rand.Int() % clusterSize
  124. etcds[idx].Signal(syscall.SIGSTOP)
  125. fmt.Printf("pause node%d and let standby node take its place\n", idx+1)
  126. time.Sleep(4 * time.Second)
  127. etcds[idx].Signal(syscall.SIGCONT)
  128. // let it change its state to candidate at least
  129. time.Sleep(time.Second)
  130. stop := make(chan bool)
  131. leaderChan := make(chan string, 1)
  132. all := make(chan bool, 1)
  133. go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
  134. <-all
  135. <-leaderChan
  136. stop <- true
  137. resp, err = c.Get("_etcd/machines", false, false)
  138. if err != nil {
  139. panic(err)
  140. }
  141. if len(resp.Node.Nodes) != 3 {
  142. t.Fatalf("add peer fails (%d != 3)", len(resp.Node.Nodes))
  143. }
  144. for i := 0; i < 3; i++ {
  145. if resp.Node.Nodes[i].Key == fmt.Sprintf("node%d", idx+1) {
  146. t.Fatal("node should be removed")
  147. }
  148. }
  149. }
  150. }