remove_node_test.go 5.1 KB


  1. package test
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "net/http"
  7. "os"
  8. "syscall"
  9. "testing"
  10. "time"
  11. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  12. "github.com/coreos/etcd/tests"
  13. "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
  14. )
  15. // remove the node and node rejoin with previous log
  16. func TestRemoveNode(t *testing.T) {
  17. procAttr := new(os.ProcAttr)
  18. procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
  19. clusterSize := 4
  20. argGroup, etcds, _ := CreateCluster(clusterSize, procAttr, false)
  21. defer DestroyCluster(etcds)
  22. time.Sleep(time.Second)
  23. c := etcd.NewClient(nil)
  24. c.SyncCluster()
  25. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "syncInterval":1}`))
  26. if !assert.Equal(t, resp.StatusCode, 200) {
  27. t.FailNow()
  28. }
  29. rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil)
  30. client := &http.Client{}
  31. for i := 0; i < 2; i++ {
  32. for i := 0; i < 2; i++ {
  33. r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`))
  34. if !assert.Equal(t, r.StatusCode, 200) {
  35. t.FailNow()
  36. }
  37. client.Do(rmReq)
  38. fmt.Println("send remove to node3 and wait for its exiting")
  39. time.Sleep(100 * time.Millisecond)
  40. resp, err := c.Get("_etcd/machines", false, false)
  41. if err != nil {
  42. panic(err)
  43. }
  44. if len(resp.Node.Nodes) != 3 {
  45. t.Fatal("cannot remove peer")
  46. }
  47. etcds[2].Kill()
  48. etcds[2].Wait()
  49. if i == 1 {
  50. // rejoin with log
  51. etcds[2], err = os.StartProcess(EtcdBinPath, argGroup[2], procAttr)
  52. } else {
  53. // rejoin without log
  54. etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr)
  55. }
  56. if err != nil {
  57. panic(err)
  58. }
  59. r, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4}`))
  60. if !assert.Equal(t, r.StatusCode, 200) {
  61. t.FailNow()
  62. }
  63. time.Sleep(time.Second + time.Second)
  64. resp, err = c.Get("_etcd/machines", false, false)
  65. if err != nil {
  66. panic(err)
  67. }
  68. if len(resp.Node.Nodes) != 4 {
  69. t.Fatalf("add peer fails #1 (%d != 4)", len(resp.Node.Nodes))
  70. }
  71. }
  72. // first kill the node, then remove it, then add it back
  73. for i := 0; i < 2; i++ {
  74. r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`))
  75. if !assert.Equal(t, r.StatusCode, 200) {
  76. t.FailNow()
  77. }
  78. etcds[2].Kill()
  79. fmt.Println("kill node3 and wait for its exiting")
  80. etcds[2].Wait()
  81. client.Do(rmReq)
  82. time.Sleep(100 * time.Millisecond)
  83. resp, err := c.Get("_etcd/machines", false, false)
  84. if err != nil {
  85. panic(err)
  86. }
  87. if len(resp.Node.Nodes) != 3 {
  88. t.Fatal("cannot remove peer")
  89. }
  90. if i == 1 {
  91. // rejoin with log
  92. etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2]), procAttr)
  93. } else {
  94. // rejoin without log
  95. etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr)
  96. }
  97. if err != nil {
  98. panic(err)
  99. }
  100. r, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4}`))
  101. if !assert.Equal(t, r.StatusCode, 200) {
  102. t.FailNow()
  103. }
  104. time.Sleep(time.Second + time.Second)
  105. resp, err = c.Get("_etcd/machines", false, false)
  106. if err != nil {
  107. panic(err)
  108. }
  109. if len(resp.Node.Nodes) != 4 {
  110. t.Fatalf("add peer fails #2 (%d != 4)", len(resp.Node.Nodes))
  111. }
  112. }
  113. }
  114. }
  115. func TestRemovePausedNode(t *testing.T) {
  116. procAttr := new(os.ProcAttr)
  117. procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
  118. clusterSize := 4
  119. _, etcds, _ := CreateCluster(clusterSize, procAttr, false)
  120. defer DestroyCluster(etcds)
  121. time.Sleep(time.Second)
  122. c := etcd.NewClient(nil)
  123. c.SyncCluster()
  124. r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":1, "syncInterval":1}`))
  125. if !assert.Equal(t, r.StatusCode, 200) {
  126. t.FailNow()
  127. }
  128. // Wait for standby instances to update its cluster config
  129. time.Sleep(6 * time.Second)
  130. resp, err := c.Get("_etcd/machines", false, false)
  131. if err != nil {
  132. panic(err)
  133. }
  134. if len(resp.Node.Nodes) != 3 {
  135. t.Fatal("cannot remove peer")
  136. }
  137. for i := 0; i < clusterSize; i++ {
  138. // first pause the node, then remove it, then resume it
  139. idx := rand.Int() % clusterSize
  140. etcds[idx].Signal(syscall.SIGSTOP)
  141. fmt.Printf("pause node%d and let standby node take its place\n", idx+1)
  142. time.Sleep(4 * time.Second)
  143. etcds[idx].Signal(syscall.SIGCONT)
  144. // let it change its state to candidate at least
  145. time.Sleep(time.Second)
  146. stop := make(chan bool)
  147. leaderChan := make(chan string, 1)
  148. all := make(chan bool, 1)
  149. go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
  150. <-all
  151. <-leaderChan
  152. stop <- true
  153. resp, err = c.Get("_etcd/machines", false, false)
  154. if err != nil {
  155. panic(err)
  156. }
  157. if len(resp.Node.Nodes) != 3 {
  158. t.Fatalf("add peer fails (%d != 3)", len(resp.Node.Nodes))
  159. }
  160. for i := 0; i < 3; i++ {
  161. if resp.Node.Nodes[i].Key == fmt.Sprintf("node%d", idx+1) {
  162. t.Fatal("node should be removed")
  163. }
  164. }
  165. }
  166. }