util_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "net"
  18. "net/http"
  19. "net/http/httptest"
  20. "net/url"
  21. "os"
  22. "strings"
  23. "testing"
  24. "time"
  25. "github.com/coreos/etcd/conf"
  26. )
  27. func TestMultipleNodes(t *testing.T) {
  28. defer afterTest(t)
  29. tests := []int{1, 3, 5, 9, 11}
  30. for _, tt := range tests {
  31. c := &testCluster{Size: tt}
  32. c.Start()
  33. c.Destroy()
  34. }
  35. }
  36. func TestMultipleTLSNodes(t *testing.T) {
  37. defer afterTest(t)
  38. tests := []int{1, 3, 5}
  39. for _, tt := range tests {
  40. c := &testCluster{Size: tt, TLS: true}
  41. c.Start()
  42. c.Destroy()
  43. }
  44. }
  45. type testCluster struct {
  46. Size int
  47. TLS bool
  48. nodes []*testServer
  49. }
  50. func (c *testCluster) Start() {
  51. if c.Size <= 0 {
  52. panic("cluster size <= 0")
  53. }
  54. nodes := make([]*testServer, c.Size)
  55. c.nodes = nodes
  56. nodes[0] = &testServer{Id: 0, TLS: c.TLS}
  57. nodes[0].Start()
  58. if !nodes[0].WaitMode(participantMode, 1) {
  59. panic("cannot wait until participantMode")
  60. }
  61. seed := nodes[0].URL
  62. for i := 1; i < c.Size; i++ {
  63. conf := newTestConfig()
  64. conf.Peers = []string{seed}
  65. id := int64(i)
  66. s := &testServer{Config: conf, Id: id, TLS: c.TLS}
  67. s.Start()
  68. nodes[i] = s
  69. // Wait for the previous configuration change to be committed
  70. // or this configuration request might be dropped.
  71. // Or it could be a slow join because it needs to retry.
  72. // TODO: this might not be true if we add param for retry interval.
  73. if !s.WaitMode(participantMode, 3) {
  74. panic("cannot wait until participantMode")
  75. }
  76. w, err := s.P().Watch(v2machineKVPrefix, true, false, uint64(i))
  77. if err != nil {
  78. panic(err)
  79. }
  80. <-w.EventChan
  81. }
  82. c.wait()
  83. }
  84. func (c *testCluster) wait() {
  85. size := c.Size
  86. for i := 0; i < size; i++ {
  87. for k := 0; k < size; k++ {
  88. s := c.At(i)
  89. w, err := s.P().Watch(v2machineKVPrefix+fmt.Sprintf("/%d", c.At(k).Id), false, false, 1)
  90. if err != nil {
  91. panic(err)
  92. }
  93. <-w.EventChan
  94. }
  95. }
  96. clusterId := c.P(0).node.ClusterId()
  97. for i := 0; i < size; i++ {
  98. if g := c.P(i).node.ClusterId(); g != clusterId {
  99. panic(fmt.Sprintf("#%d: clusterId = %x, want %x", i, g, clusterId))
  100. }
  101. }
  102. }
  103. func (c *testCluster) At(i int) *testServer {
  104. return c.nodes[i]
  105. }
  106. func (c *testCluster) P(i int) *participant {
  107. return c.At(i).P()
  108. }
  109. func (c *testCluster) Destroy() {
  110. for _, s := range c.nodes {
  111. s.Destroy()
  112. }
  113. }
  114. type testServer struct {
  115. Config *conf.Config
  116. Id int64
  117. TLS bool
  118. // base URL of form http://ipaddr:port with no trailing slash
  119. URL string
  120. e *Server
  121. h *httptest.Server
  122. }
  123. func (s *testServer) Start() {
  124. if s.Config == nil {
  125. s.Config = newTestConfig()
  126. }
  127. c := s.Config
  128. if !strings.HasPrefix(c.DataDir, os.TempDir()) {
  129. panic("dataDir may pollute file system")
  130. }
  131. if c.Peer.CAFile != "" || c.Peer.CertFile != "" || c.Peer.KeyFile != "" {
  132. panic("use TLS field instead")
  133. }
  134. nc := new(conf.Config)
  135. *nc = *c
  136. e, err := New(nc)
  137. if err != nil {
  138. panic(err)
  139. }
  140. s.e = e
  141. e.setId(s.Id)
  142. tick := time.Duration(c.Peer.HeartbeatInterval) * time.Millisecond
  143. e.SetTick(tick)
  144. m := http.NewServeMux()
  145. m.Handle("/", e)
  146. m.Handle("/raft", e.RaftHandler())
  147. m.Handle("/raft/", e.RaftHandler())
  148. m.Handle("/v2/admin/", e.RaftHandler())
  149. addr := c.Addr
  150. if s.URL != "" {
  151. addr = urlHost(s.URL)
  152. }
  153. s.h = startServingAddr(addr, m, s.TLS)
  154. s.URL = s.h.URL
  155. e.pubAddr = s.URL
  156. e.raftPubAddr = s.URL
  157. go e.Run()
  158. }
  159. func (s *testServer) WaitMode(mode int64, ms int) bool {
  160. for i := 0; i < ms+1; i++ {
  161. if s.e.mode.Get() == mode {
  162. return true
  163. }
  164. time.Sleep(time.Millisecond)
  165. }
  166. return false
  167. }
  168. func (s *testServer) P() *participant {
  169. if s.e.mode.Get() != participantMode {
  170. panic("cannot get P if not in participant mode")
  171. }
  172. return s.e.p
  173. }
  174. func (s *testServer) Stop() error {
  175. err := s.e.Stop()
  176. s.h.Close()
  177. return err
  178. }
  179. func (s *testServer) Destroy() {
  180. s.Stop()
  181. if err := os.RemoveAll(s.Config.DataDir); err != nil {
  182. panic(err)
  183. }
  184. }
  185. func startServingAddr(addr string, h http.Handler, tls bool) *httptest.Server {
  186. var l net.Listener
  187. var err error
  188. for i := 0; i < 4; i++ {
  189. l, err = net.Listen("tcp", addr)
  190. if err == nil {
  191. break
  192. }
  193. if !strings.Contains(err.Error(), "address already in use") {
  194. panic(err)
  195. }
  196. time.Sleep(500 * time.Millisecond)
  197. }
  198. if l == nil {
  199. panic("cannot listen on " + addr)
  200. }
  201. hs := &httptest.Server{
  202. Listener: l,
  203. Config: &http.Server{Handler: h},
  204. }
  205. if tls {
  206. hs.StartTLS()
  207. } else {
  208. hs.Start()
  209. }
  210. return hs
  211. }
  212. func newTestConfig() *conf.Config {
  213. c := conf.New()
  214. c.Addr = "127.0.0.1:0"
  215. c.Peer.Addr = "127.0.0.1:0"
  216. c.Peer.HeartbeatInterval = 5
  217. c.Peer.ElectionTimeout = 25
  218. dataDir, err := ioutil.TempDir(os.TempDir(), "etcd")
  219. if err != nil {
  220. panic(err)
  221. }
  222. c.DataDir = dataDir
  223. return c
  224. }
  225. func urlHost(urlStr string) string {
  226. u, err := url.Parse(urlStr)
  227. if err != nil {
  228. panic(err)
  229. }
  230. return u.Host
  231. }