proxy_test.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package test
  2. import (
  3. "bytes"
  4. "fmt"
  5. "os"
  6. "testing"
  7. "time"
  8. "github.com/coreos/etcd/server"
  9. "github.com/coreos/etcd/tests"
  10. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  11. "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
  12. )
  13. // Create a full cluster and then add extra an extra proxy node.
  14. func TestProxy(t *testing.T) {
  15. clusterSize := 10 // DefaultActiveSize + 1
  16. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  17. assert.NoError(t, err)
  18. defer DestroyCluster(etcds)
  19. if err != nil {
  20. t.Fatal("cannot create cluster")
  21. }
  22. c := etcd.NewClient(nil)
  23. c.SyncCluster()
  24. // Set key.
  25. time.Sleep(time.Second)
  26. if _, err := c.Set("foo", "bar", 0); err != nil {
  27. panic(err)
  28. }
  29. time.Sleep(time.Second)
  30. // Check that all peers and proxies have the value.
  31. for i := range etcds {
  32. resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
  33. if assert.NoError(t, err) {
  34. body := tests.ReadBodyJSON(resp)
  35. if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
  36. assert.Equal(t, node["value"], "bar")
  37. }
  38. }
  39. }
  40. // Verify that we have one proxy.
  41. result, err := c.Get("_etcd/proxies", false, true)
  42. assert.NoError(t, err)
  43. assert.Equal(t, len(result.Node.Nodes), 1)
  44. // Reconfigure with larger active size (10 nodes) and wait for promotion.
  45. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`))
  46. if !assert.Equal(t, resp.StatusCode, 200) {
  47. t.FailNow()
  48. }
  49. time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
  50. // Verify that the proxy node is now a peer.
  51. result, err = c.Get("_etcd/proxies", false, true)
  52. assert.NoError(t, err)
  53. assert.Equal(t, len(result.Node.Nodes), 0)
  54. // Reconfigure with a smaller active size (8 nodes).
  55. resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "promoteDelay":1800}`))
  56. if !assert.Equal(t, resp.StatusCode, 200) {
  57. t.FailNow()
  58. }
  59. // Wait for two monitor cycles before checking for demotion.
  60. time.Sleep((2 * server.ActiveMonitorTimeout) + (1 * time.Second))
  61. // Verify that we now have eight peers.
  62. result, err = c.Get("_etcd/machines", false, true)
  63. assert.NoError(t, err)
  64. assert.Equal(t, len(result.Node.Nodes), 8)
  65. // Verify that we now have two proxies.
  66. result, err = c.Get("_etcd/proxies", false, true)
  67. assert.NoError(t, err)
  68. assert.Equal(t, len(result.Node.Nodes), 2)
  69. }
  70. // Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion.
  71. func TestProxyAutoPromote(t *testing.T) {
  72. clusterSize := 10 // DefaultActiveSize + 1
  73. _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
  74. if err != nil {
  75. t.Fatal("cannot create cluster")
  76. }
  77. defer func() {
  78. // Wrap this in a closure so that it picks up the updated version of
  79. // the "etcds" variable.
  80. DestroyCluster(etcds)
  81. }()
  82. c := etcd.NewClient(nil)
  83. c.SyncCluster()
  84. time.Sleep(1 * time.Second)
  85. // Verify that we have one proxy.
  86. result, err := c.Get("_etcd/proxies", false, true)
  87. assert.NoError(t, err)
  88. assert.Equal(t, len(result.Node.Nodes), 1)
  89. // Reconfigure with a short promote delay (2 second).
  90. resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":9, "promoteDelay":2}`))
  91. if !assert.Equal(t, resp.StatusCode, 200) {
  92. t.FailNow()
  93. }
  94. // Remove peer.
  95. etcd := etcds[1]
  96. etcds = append(etcds[:1], etcds[2:]...)
  97. if err := etcd.Kill(); err != nil {
  98. panic(err.Error())
  99. }
  100. etcd.Release()
  101. // Wait for it to get dropped.
  102. time.Sleep(server.PeerActivityMonitorTimeout + (2 * time.Second))
  103. // Wait for the proxy to be promoted.
  104. time.Sleep(server.ActiveMonitorTimeout + (2 * time.Second))
  105. // Verify that we have 9 peers.
  106. result, err = c.Get("_etcd/machines", true, true)
  107. assert.NoError(t, err)
  108. assert.Equal(t, len(result.Node.Nodes), 9)
  109. // Verify that node10 is one of those peers.
  110. result, err = c.Get("_etcd/machines/node10", false, false)
  111. assert.NoError(t, err)
  112. // Verify that there are no more proxies.
  113. result, err = c.Get("_etcd/proxies", false, true)
  114. assert.NoError(t, err)
  115. if assert.Equal(t, len(result.Node.Nodes), 1) {
  116. assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/proxies/node2")
  117. }
  118. }