standby_test.go 10.0 KB


  1. // +build ignore
  2. package test
  3. import (
  4. "bytes"
  5. "fmt"
  6. "os"
  7. "testing"
  8. "time"
  9. "github.com/coreos/etcd/server"
  10. "github.com/coreos/etcd/store"
  11. "github.com/coreos/etcd/tests"
  12. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  13. "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
  14. )
  15. // Create a full cluster and then change the active size.
  16. func TestStandby(t *testing.T) {
  17. clusterSize := 15
  18. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  19. if !assert.NoError(t, err) {
  20. t.Fatal("cannot create cluster")
  21. }
  22. defer DestroyCluster(etcds)
  23. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`))
  24. if !assert.Equal(t, resp.StatusCode, 200) {
  25. t.FailNow()
  26. }
  27. time.Sleep(time.Second)
  28. c := etcd.NewClient(nil)
  29. c.SyncCluster()
  30. // Verify that we just have default machines.
  31. result, err := c.Get("_etcd/machines", false, true)
  32. assert.NoError(t, err)
  33. assert.Equal(t, len(result.Node.Nodes), 9)
  34. t.Log("Reconfigure with a smaller active size")
  35. resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7, "syncInterval":1}`))
  36. if !assert.Equal(t, resp.StatusCode, 200) {
  37. t.FailNow()
  38. }
  39. // Wait for two monitor cycles before checking for demotion.
  40. time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second))
  41. // Verify that we now have seven peers.
  42. result, err = c.Get("_etcd/machines", false, true)
  43. assert.NoError(t, err)
  44. assert.Equal(t, len(result.Node.Nodes), 7)
  45. t.Log("Test the functionality of all servers")
  46. // Set key.
  47. time.Sleep(time.Second)
  48. if _, err := c.Set("foo", "bar", 0); err != nil {
  49. panic(err)
  50. }
  51. time.Sleep(time.Second)
  52. // Check that all peers and standbys have the value.
  53. for i := range etcds {
  54. resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
  55. if assert.NoError(t, err) {
  56. body := tests.ReadBodyJSON(resp)
  57. if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
  58. assert.Equal(t, node["value"], "bar")
  59. }
  60. }
  61. }
  62. t.Log("Reconfigure with larger active size and wait for join")
  63. resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "syncInterval":1}`))
  64. if !assert.Equal(t, resp.StatusCode, 200) {
  65. t.FailNow()
  66. }
  67. time.Sleep((1 * time.Second) + (1 * time.Second))
  68. // Verify that exactly eight machines are in the cluster.
  69. result, err = c.Get("_etcd/machines", false, true)
  70. assert.NoError(t, err)
  71. assert.Equal(t, len(result.Node.Nodes), 8)
  72. }
  73. // Create a full cluster, disconnect a peer, wait for removal, wait for standby join.
  74. func TestStandbyAutoJoin(t *testing.T) {
  75. clusterSize := 5
  76. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  77. if err != nil {
  78. t.Fatal("cannot create cluster")
  79. }
  80. defer func() {
  81. // Wrap this in a closure so that it picks up the updated version of
  82. // the "etcds" variable.
  83. DestroyCluster(etcds)
  84. }()
  85. c := etcd.NewClient(nil)
  86. c.SyncCluster()
  87. time.Sleep(1 * time.Second)
  88. // Verify that we have five machines.
  89. result, err := c.Get("_etcd/machines", false, true)
  90. assert.NoError(t, err)
  91. assert.Equal(t, len(result.Node.Nodes), 5)
  92. // Reconfigure with a short remove delay (2 second).
  93. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "removeDelay":2, "syncInterval":1}`))
  94. if !assert.Equal(t, resp.StatusCode, 200) {
  95. t.FailNow()
  96. }
  97. // Wait for a monitor cycle before checking for removal.
  98. time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
  99. // Verify that we now have four peers.
  100. result, err = c.Get("_etcd/machines", false, true)
  101. assert.NoError(t, err)
  102. assert.Equal(t, len(result.Node.Nodes), 4)
  103. // Remove peer.
  104. etcd := etcds[1]
  105. etcds = append(etcds[:1], etcds[2:]...)
  106. if err := etcd.Kill(); err != nil {
  107. panic(err.Error())
  108. }
  109. etcd.Release()
  110. // Wait for it to get dropped.
  111. time.Sleep(server.PeerActivityMonitorTimeout + (1 * time.Second))
  112. // Wait for the standby to join.
  113. time.Sleep((1 * time.Second) + (1 * time.Second))
  114. // Verify that we have 4 peers.
  115. result, err = c.Get("_etcd/machines", true, true)
  116. assert.NoError(t, err)
  117. assert.Equal(t, len(result.Node.Nodes), 4)
  118. // Verify that node2 is not one of those peers.
  119. _, err = c.Get("_etcd/machines/node2", false, false)
  120. assert.Error(t, err)
  121. }
  122. // Create a full cluster and then change the active size gradually.
  123. func TestStandbyGradualChange(t *testing.T) {
  124. clusterSize := 9
  125. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  126. assert.NoError(t, err)
  127. defer DestroyCluster(etcds)
  128. if err != nil {
  129. t.Fatal("cannot create cluster")
  130. }
  131. time.Sleep(time.Second)
  132. c := etcd.NewClient(nil)
  133. c.SyncCluster()
  134. num := clusterSize
  135. for inc := 0; inc < 2; inc++ {
  136. for i := 0; i < 6; i++ {
  137. // Verify that we just have i machines.
  138. result, err := c.Get("_etcd/machines", false, true)
  139. assert.NoError(t, err)
  140. assert.Equal(t, len(result.Node.Nodes), num)
  141. if inc == 0 {
  142. num--
  143. } else {
  144. num++
  145. }
  146. t.Log("Reconfigure with active size", num)
  147. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num)))
  148. if !assert.Equal(t, resp.StatusCode, 200) {
  149. t.FailNow()
  150. }
  151. if inc == 0 {
  152. // Wait for monitor cycles before checking for demotion.
  153. time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
  154. } else {
  155. time.Sleep(time.Second + (1 * time.Second))
  156. }
  157. // Verify that we now have peers.
  158. result, err = c.Get("_etcd/machines", false, true)
  159. assert.NoError(t, err)
  160. assert.Equal(t, len(result.Node.Nodes), num)
  161. t.Log("Test the functionality of all servers")
  162. // Set key.
  163. if _, err := c.Set("foo", "bar", 0); err != nil {
  164. panic(err)
  165. }
  166. time.Sleep(100 * time.Millisecond)
  167. // Check that all peers and standbys have the value.
  168. for i := range etcds {
  169. resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
  170. if assert.NoError(t, err) {
  171. body := tests.ReadBodyJSON(resp)
  172. if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
  173. assert.Equal(t, node["value"], "bar")
  174. }
  175. }
  176. }
  177. }
  178. }
  179. }
  180. // Create a full cluster and then change the active size dramatically.
  181. func TestStandbyDramaticChange(t *testing.T) {
  182. clusterSize := 9
  183. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  184. assert.NoError(t, err)
  185. defer DestroyCluster(etcds)
  186. if err != nil {
  187. t.Fatal("cannot create cluster")
  188. }
  189. time.Sleep(time.Second)
  190. c := etcd.NewClient(nil)
  191. c.SyncCluster()
  192. num := clusterSize
  193. for i := 0; i < 3; i++ {
  194. for inc := 0; inc < 2; inc++ {
  195. // Verify that we just have i machines.
  196. result, err := c.Get("_etcd/machines", false, true)
  197. assert.NoError(t, err)
  198. assert.Equal(t, len(result.Node.Nodes), num)
  199. if inc == 0 {
  200. num -= 6
  201. } else {
  202. num += 6
  203. }
  204. t.Log("Reconfigure with active size", num)
  205. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num)))
  206. if !assert.Equal(t, resp.StatusCode, 200) {
  207. t.FailNow()
  208. }
  209. if inc == 0 {
  210. // Wait for monitor cycles before checking for demotion.
  211. time.Sleep(6*server.ActiveMonitorTimeout + (1 * time.Second))
  212. } else {
  213. time.Sleep(time.Second + (1 * time.Second))
  214. }
  215. // Verify that we now have peers.
  216. result, err = c.Get("_etcd/machines", false, true)
  217. assert.NoError(t, err)
  218. assert.Equal(t, len(result.Node.Nodes), num)
  219. t.Log("Test the functionality of all servers")
  220. // Set key.
  221. if _, err := c.Set("foo", "bar", 0); err != nil {
  222. panic(err)
  223. }
  224. time.Sleep(100 * time.Millisecond)
  225. // Check that all peers and standbys have the value.
  226. for i := range etcds {
  227. resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
  228. if assert.NoError(t, err) {
  229. body := tests.ReadBodyJSON(resp)
  230. if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
  231. assert.Equal(t, node["value"], "bar")
  232. }
  233. }
  234. }
  235. }
  236. }
  237. }
  238. func TestStandbyJoinMiss(t *testing.T) {
  239. clusterSize := 2
  240. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  241. if err != nil {
  242. t.Fatal("cannot create cluster")
  243. }
  244. defer DestroyCluster(etcds)
  245. c := etcd.NewClient(nil)
  246. c.SyncCluster()
  247. time.Sleep(1 * time.Second)
  248. // Verify that we have two machines.
  249. result, err := c.Get("_etcd/machines", false, true)
  250. assert.NoError(t, err)
  251. assert.Equal(t, len(result.Node.Nodes), clusterSize)
  252. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"removeDelay":4, "syncInterval":4}`))
  253. if !assert.Equal(t, resp.StatusCode, 200) {
  254. t.FailNow()
  255. }
  256. time.Sleep(time.Second)
  257. resp, _ = tests.Delete("http://localhost:7001/v2/admin/machines/node2", "application/json", nil)
  258. if !assert.Equal(t, resp.StatusCode, 200) {
  259. t.FailNow()
  260. }
  261. // Wait for a monitor cycle before checking for removal.
  262. time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
  263. // Verify that we now have one peer.
  264. result, err = c.Get("_etcd/machines", false, true)
  265. assert.NoError(t, err)
  266. assert.Equal(t, len(result.Node.Nodes), 1)
  267. // Simulate the join failure
  268. _, err = server.NewClient(nil).AddMachine("http://localhost:7001",
  269. &server.JoinCommand{
  270. MinVersion: store.MinVersion(),
  271. MaxVersion: store.MaxVersion(),
  272. Name: "node2",
  273. RaftURL: "http://127.0.0.1:7002",
  274. EtcdURL: "http://127.0.0.1:4002",
  275. })
  276. assert.NoError(t, err)
  277. time.Sleep(6 * time.Second)
  278. go tests.Delete("http://localhost:7001/v2/admin/machines/node2", "application/json", nil)
  279. time.Sleep(time.Second)
  280. result, err = c.Get("_etcd/machines", false, true)
  281. assert.NoError(t, err)
  282. assert.Equal(t, len(result.Node.Nodes), 1)
  283. }