standby_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package test
  2. import (
  3. "bytes"
  4. "fmt"
  5. "os"
  6. "testing"
  7. "time"
  8. "github.com/coreos/etcd/server"
  9. "github.com/coreos/etcd/tests"
  10. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  11. "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
  12. )
  13. // Create a full cluster and then change the active size.
  14. func TestStandby(t *testing.T) {
  15. clusterSize := 15
  16. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  17. if !assert.NoError(t, err) {
  18. t.Fatal("cannot create cluster")
  19. }
  20. defer DestroyCluster(etcds)
  21. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`))
  22. if !assert.Equal(t, resp.StatusCode, 200) {
  23. t.FailNow()
  24. }
  25. time.Sleep(time.Second)
  26. c := etcd.NewClient(nil)
  27. c.SyncCluster()
  28. // Verify that we just have default machines.
  29. result, err := c.Get("_etcd/machines", false, true)
  30. assert.NoError(t, err)
  31. assert.Equal(t, len(result.Node.Nodes), 9)
  32. t.Log("Reconfigure with a smaller active size")
  33. resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7, "syncInterval":1}`))
  34. if !assert.Equal(t, resp.StatusCode, 200) {
  35. t.FailNow()
  36. }
  37. // Wait for two monitor cycles before checking for demotion.
  38. time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second))
  39. // Verify that we now have seven peers.
  40. result, err = c.Get("_etcd/machines", false, true)
  41. assert.NoError(t, err)
  42. assert.Equal(t, len(result.Node.Nodes), 7)
  43. t.Log("Test the functionality of all servers")
  44. // Set key.
  45. time.Sleep(time.Second)
  46. if _, err := c.Set("foo", "bar", 0); err != nil {
  47. panic(err)
  48. }
  49. time.Sleep(time.Second)
  50. // Check that all peers and standbys have the value.
  51. for i := range etcds {
  52. resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
  53. if assert.NoError(t, err) {
  54. body := tests.ReadBodyJSON(resp)
  55. if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
  56. assert.Equal(t, node["value"], "bar")
  57. }
  58. }
  59. }
  60. t.Log("Reconfigure with larger active size and wait for join")
  61. resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "syncInterval":1}`))
  62. if !assert.Equal(t, resp.StatusCode, 200) {
  63. t.FailNow()
  64. }
  65. time.Sleep((1 * time.Second) + (1 * time.Second))
  66. // Verify that exactly eight machines are in the cluster.
  67. result, err = c.Get("_etcd/machines", false, true)
  68. assert.NoError(t, err)
  69. assert.Equal(t, len(result.Node.Nodes), 8)
  70. }
  71. // Create a full cluster, disconnect a peer, wait for removal, wait for standby join.
  72. func TestStandbyAutoJoin(t *testing.T) {
  73. clusterSize := 5
  74. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  75. if err != nil {
  76. t.Fatal("cannot create cluster")
  77. }
  78. defer func() {
  79. // Wrap this in a closure so that it picks up the updated version of
  80. // the "etcds" variable.
  81. DestroyCluster(etcds)
  82. }()
  83. c := etcd.NewClient(nil)
  84. c.SyncCluster()
  85. time.Sleep(1 * time.Second)
  86. // Verify that we have five machines.
  87. result, err := c.Get("_etcd/machines", false, true)
  88. assert.NoError(t, err)
  89. assert.Equal(t, len(result.Node.Nodes), 5)
  90. // Reconfigure with a short remove delay (2 second).
  91. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "removeDelay":2, "syncInterval":1}`))
  92. if !assert.Equal(t, resp.StatusCode, 200) {
  93. t.FailNow()
  94. }
  95. // Wait for a monitor cycle before checking for removal.
  96. time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
  97. // Verify that we now have four peers.
  98. result, err = c.Get("_etcd/machines", false, true)
  99. assert.NoError(t, err)
  100. assert.Equal(t, len(result.Node.Nodes), 4)
  101. // Remove peer.
  102. etcd := etcds[1]
  103. etcds = append(etcds[:1], etcds[2:]...)
  104. if err := etcd.Kill(); err != nil {
  105. panic(err.Error())
  106. }
  107. etcd.Release()
  108. // Wait for it to get dropped.
  109. time.Sleep(server.PeerActivityMonitorTimeout + (1 * time.Second))
  110. // Wait for the standby to join.
  111. time.Sleep((1 * time.Second) + (1 * time.Second))
  112. // Verify that we have 4 peers.
  113. result, err = c.Get("_etcd/machines", true, true)
  114. assert.NoError(t, err)
  115. assert.Equal(t, len(result.Node.Nodes), 4)
  116. // Verify that node2 is not one of those peers.
  117. _, err = c.Get("_etcd/machines/node2", false, false)
  118. assert.Error(t, err)
  119. }
  120. // Create a full cluster and then change the active size gradually.
  121. func TestStandbyGradualChange(t *testing.T) {
  122. clusterSize := 9
  123. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  124. assert.NoError(t, err)
  125. defer DestroyCluster(etcds)
  126. if err != nil {
  127. t.Fatal("cannot create cluster")
  128. }
  129. time.Sleep(time.Second)
  130. c := etcd.NewClient(nil)
  131. c.SyncCluster()
  132. num := clusterSize
  133. for inc := 0; inc < 2; inc++ {
  134. for i := 0; i < 6; i++ {
  135. // Verify that we just have i machines.
  136. result, err := c.Get("_etcd/machines", false, true)
  137. assert.NoError(t, err)
  138. assert.Equal(t, len(result.Node.Nodes), num)
  139. if inc == 0 {
  140. num--
  141. } else {
  142. num++
  143. }
  144. t.Log("Reconfigure with active size", num)
  145. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num)))
  146. if !assert.Equal(t, resp.StatusCode, 200) {
  147. t.FailNow()
  148. }
  149. if inc == 0 {
  150. // Wait for monitor cycles before checking for demotion.
  151. time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
  152. } else {
  153. time.Sleep(time.Second + (1 * time.Second))
  154. }
  155. // Verify that we now have peers.
  156. result, err = c.Get("_etcd/machines", false, true)
  157. assert.NoError(t, err)
  158. assert.Equal(t, len(result.Node.Nodes), num)
  159. t.Log("Test the functionality of all servers")
  160. // Set key.
  161. if _, err := c.Set("foo", "bar", 0); err != nil {
  162. panic(err)
  163. }
  164. time.Sleep(100 * time.Millisecond)
  165. // Check that all peers and standbys have the value.
  166. for i := range etcds {
  167. resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
  168. if assert.NoError(t, err) {
  169. body := tests.ReadBodyJSON(resp)
  170. if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
  171. assert.Equal(t, node["value"], "bar")
  172. }
  173. }
  174. }
  175. }
  176. }
  177. }
  178. // Create a full cluster and then change the active size dramatically.
  179. func TestStandbyDramaticChange(t *testing.T) {
  180. clusterSize := 9
  181. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  182. assert.NoError(t, err)
  183. defer DestroyCluster(etcds)
  184. if err != nil {
  185. t.Fatal("cannot create cluster")
  186. }
  187. time.Sleep(time.Second)
  188. c := etcd.NewClient(nil)
  189. c.SyncCluster()
  190. num := clusterSize
  191. for i := 0; i < 3; i++ {
  192. for inc := 0; inc < 2; inc++ {
  193. // Verify that we just have i machines.
  194. result, err := c.Get("_etcd/machines", false, true)
  195. assert.NoError(t, err)
  196. assert.Equal(t, len(result.Node.Nodes), num)
  197. if inc == 0 {
  198. num -= 6
  199. } else {
  200. num += 6
  201. }
  202. t.Log("Reconfigure with active size", num)
  203. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num)))
  204. if !assert.Equal(t, resp.StatusCode, 200) {
  205. t.FailNow()
  206. }
  207. if inc == 0 {
  208. // Wait for monitor cycles before checking for demotion.
  209. time.Sleep(6*server.ActiveMonitorTimeout + (1 * time.Second))
  210. } else {
  211. time.Sleep(time.Second + (1 * time.Second))
  212. }
  213. // Verify that we now have peers.
  214. result, err = c.Get("_etcd/machines", false, true)
  215. assert.NoError(t, err)
  216. assert.Equal(t, len(result.Node.Nodes), num)
  217. t.Log("Test the functionality of all servers")
  218. // Set key.
  219. if _, err := c.Set("foo", "bar", 0); err != nil {
  220. panic(err)
  221. }
  222. time.Sleep(100 * time.Millisecond)
  223. // Check that all peers and standbys have the value.
  224. for i := range etcds {
  225. resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
  226. if assert.NoError(t, err) {
  227. body := tests.ReadBodyJSON(resp)
  228. if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
  229. assert.Equal(t, node["value"], "bar")
  230. }
  231. }
  232. }
  233. }
  234. }
  235. }