etcd_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "net"
  18. "net/http"
  19. "net/http/httptest"
  20. "net/url"
  21. "os"
  22. "reflect"
  23. "strings"
  24. "testing"
  25. "time"
  26. "github.com/coreos/etcd/conf"
  27. "github.com/coreos/etcd/store"
  28. )
  29. func TestMultipleNodes(t *testing.T) {
  30. defer afterTest(t)
  31. tests := []int{1, 3, 5, 9, 11}
  32. for _, tt := range tests {
  33. c := &testCluster{Size: tt}
  34. c.Start()
  35. c.Destroy()
  36. }
  37. }
  38. func TestMultipleTLSNodes(t *testing.T) {
  39. defer afterTest(t)
  40. tests := []int{1, 3, 5}
  41. for _, tt := range tests {
  42. c := &testCluster{Size: tt, TLS: true}
  43. c.Start()
  44. c.Destroy()
  45. }
  46. }
  47. func TestV2Redirect(t *testing.T) {
  48. defer afterTest(t)
  49. c := &testCluster{Size: 3}
  50. c.Start()
  51. defer c.Destroy()
  52. u := c.URL(1)
  53. ru := fmt.Sprintf("%s%s", c.URL(0), "/v2/keys/foo")
  54. tc := NewTestClient()
  55. v := url.Values{}
  56. v.Set("value", "XXX")
  57. resp, _ := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v)
  58. if resp.StatusCode != http.StatusTemporaryRedirect {
  59. t.Errorf("status = %d, want %d", resp.StatusCode, http.StatusTemporaryRedirect)
  60. }
  61. location, err := resp.Location()
  62. if err != nil {
  63. t.Errorf("want err = %, want nil", err)
  64. }
  65. if location.String() != ru {
  66. t.Errorf("location = %v, want %v", location.String(), ru)
  67. }
  68. resp.Body.Close()
  69. }
  70. func TestRemove(t *testing.T) {
  71. defer afterTest(t)
  72. tests := []int{3, 4, 5, 6}
  73. for aa := 0; aa < 1; aa++ {
  74. for k, tt := range tests {
  75. cl := testCluster{Size: tt}
  76. cl.Start()
  77. lead, _ := cl.Leader()
  78. config := conf.NewClusterConfig()
  79. config.ActiveSize = 0
  80. if err := cl.Participant(int(lead)).setClusterConfig(config); err != nil {
  81. t.Fatalf("#%d: setClusterConfig err = %v", k, err)
  82. }
  83. // we don't remove the machine from 2-node cluster because it is
  84. // not 100 percent safe in our raft.
  85. // TODO(yichengq): improve it later.
  86. for i := 0; i < tt-2; i++ {
  87. id := int64(i)
  88. for {
  89. n := cl.Node(int(id))
  90. if n.e.mode.Get() == standbyMode {
  91. break
  92. }
  93. err := n.Participant().remove(id)
  94. if err == nil {
  95. break
  96. }
  97. switch err {
  98. case tmpErr:
  99. time.Sleep(defaultElection * 5 * time.Millisecond)
  100. case raftStopErr, stopErr:
  101. default:
  102. t.Fatal(err)
  103. }
  104. }
  105. cl.Node(i).WaitMode(standbyMode)
  106. }
  107. cl.Destroy()
  108. }
  109. }
  110. }
  111. // TODO(yicheng) Add test for becoming standby
  112. // maxSize -> standby
  113. // auto-demote -> standby
  114. // remove -> standby
  115. func TestReleaseVersion(t *testing.T) {
  116. defer afterTest(t)
  117. cl := testCluster{Size: 1}
  118. cl.Start()
  119. defer cl.Destroy()
  120. resp, err := http.Get(cl.URL(0) + "/version")
  121. if err != nil {
  122. t.Fatal(err)
  123. }
  124. defer resp.Body.Close()
  125. g, err := ioutil.ReadAll(resp.Body)
  126. if err != nil {
  127. t.Error(err)
  128. }
  129. gs := string(g)
  130. w := fmt.Sprintf("etcd %s", releaseVersion)
  131. if gs != w {
  132. t.Errorf("version = %v, want %v", gs, w)
  133. }
  134. }
  135. func TestVersionCheck(t *testing.T) {
  136. defer afterTest(t)
  137. cl := testCluster{Size: 1}
  138. cl.Start()
  139. defer cl.Destroy()
  140. u := cl.URL(0)
  141. currentVersion := 2
  142. tests := []struct {
  143. version int
  144. wStatus int
  145. }{
  146. {currentVersion - 1, http.StatusForbidden},
  147. {currentVersion, http.StatusOK},
  148. {currentVersion + 1, http.StatusForbidden},
  149. }
  150. for i, tt := range tests {
  151. resp, err := http.Get(fmt.Sprintf("%s/raft/version/%d/check", u, tt.version))
  152. if err != nil {
  153. t.Fatal(err)
  154. }
  155. resp.Body.Close()
  156. if resp.StatusCode != tt.wStatus {
  157. t.Fatal("#%d: status = %d, want %d", i, resp.StatusCode, tt.wStatus)
  158. }
  159. }
  160. }
  161. func TestSingleNodeRecovery(t *testing.T) {
  162. defer afterTest(t)
  163. c := newTestConfig()
  164. ts := testServer{Id: genId(), Config: c}
  165. ts.Start()
  166. defer ts.Destroy()
  167. ts.WaitMode(participantMode)
  168. key := "/foo"
  169. ev, err := ts.Participant().Set(key, false, "bar", time.Now().Add(time.Second*100))
  170. if err != nil {
  171. t.Fatal(err)
  172. }
  173. ts.Stop()
  174. ts = testServer{Id: ts.Id, Config: c}
  175. ts.Start()
  176. ts.WaitMode(participantMode)
  177. w, err := ts.Participant().Store.Watch(key, false, false, ev.Index())
  178. if err != nil {
  179. t.Fatal(err)
  180. }
  181. // give testing server time to load the previous WAL file
  182. select {
  183. case <-w.EventChan:
  184. case <-time.After(time.Second):
  185. t.Fatal("watch timeout")
  186. }
  187. }
  188. func TestTakingSnapshot(t *testing.T) {
  189. defer afterTest(t)
  190. cl := testCluster{Size: 1}
  191. cl.Start()
  192. defer cl.Destroy()
  193. // TODO(xiangli): tunable compact; reduce testing time
  194. for i := 0; i < defaultCompact; i++ {
  195. cl.Participant(0).Set("/foo", false, "bar", store.Permanent)
  196. }
  197. snap := cl.Participant(0).node.GetSnap()
  198. if snap.Index != defaultCompact {
  199. t.Errorf("snap.Index = %d, want %d", snap.Index, defaultCompact)
  200. }
  201. }
  202. func TestRestoreSnapshotFromLeader(t *testing.T) {
  203. defer afterTest(t)
  204. cl := testCluster{Size: 1}
  205. cl.Start()
  206. defer cl.Destroy()
  207. // let leader do snapshot
  208. for i := 0; i < defaultCompact; i++ {
  209. cl.Participant(0).Set(fmt.Sprint("/foo", i), false, fmt.Sprint("bar", i), store.Permanent)
  210. }
  211. // create one to join the cluster
  212. c := newTestConfig()
  213. c.Peers = []string{cl.URL(0)}
  214. ts := testServer{Config: c, Id: 1}
  215. ts.Start()
  216. defer ts.Destroy()
  217. ts.WaitMode(participantMode)
  218. // check new proposal could be submitted
  219. if _, err := cl.Participant(0).Set("/foo", false, "bar", store.Permanent); err != nil {
  220. t.Fatal(err)
  221. }
  222. // check store is recovered
  223. for i := 0; i < defaultCompact; i++ {
  224. ev, err := ts.Participant().Store.Get(fmt.Sprint("/foo", i), false, false)
  225. if err != nil {
  226. t.Errorf("get err = %v", err)
  227. continue
  228. }
  229. w := fmt.Sprint("bar", i)
  230. if g := *ev.Node.Value; g != w {
  231. t.Errorf("value = %v, want %v", g, w)
  232. }
  233. }
  234. // check new proposal could be committed in the new machine
  235. wch, err := ts.Participant().Watch("/foo", false, false, defaultCompact)
  236. if err != nil {
  237. t.Errorf("watch err = %v", err)
  238. }
  239. <-wch.EventChan
  240. g := ts.Participant().node.Nodes()
  241. w := cl.Participant(0).node.Nodes()
  242. if !reflect.DeepEqual(g, w) {
  243. t.Errorf("nodes = %v, want %v", g, w)
  244. }
  245. }
  246. type testCluster struct {
  247. Size int
  248. TLS bool
  249. nodes []*testServer
  250. }
  251. func (c *testCluster) Start() {
  252. if c.Size <= 0 {
  253. panic("cluster size <= 0")
  254. }
  255. nodes := make([]*testServer, c.Size)
  256. c.nodes = nodes
  257. nodes[0] = &testServer{Id: 0, TLS: c.TLS}
  258. nodes[0].Start()
  259. nodes[0].WaitMode(participantMode)
  260. seed := nodes[0].URL
  261. for i := 1; i < c.Size; i++ {
  262. cfg := newTestConfig()
  263. cfg.Peers = []string{seed}
  264. id := int64(i)
  265. s := &testServer{Config: cfg, Id: id, TLS: c.TLS}
  266. s.Start()
  267. nodes[i] = s
  268. // Wait for the previous configuration change to be committed
  269. // or this configuration request might be dropped.
  270. // Or it could be a slow join because it needs to retry.
  271. // TODO: this might not be true if we add param for retry interval.
  272. s.WaitMode(participantMode)
  273. w, err := s.Participant().Watch(v2machineKVPrefix, true, false, uint64(i))
  274. if err != nil {
  275. panic(err)
  276. }
  277. <-w.EventChan
  278. }
  279. c.wait()
  280. }
  281. func (c *testCluster) wait() {
  282. size := c.Size
  283. for i := 0; i < size; i++ {
  284. for k := 0; k < size; k++ {
  285. s := c.Node(i)
  286. wp := v2machineKVPrefix + fmt.Sprintf("/%d", c.Node(k).Id)
  287. w, err := s.Participant().Watch(wp, false, false, 1)
  288. if err != nil {
  289. panic(err)
  290. }
  291. <-w.EventChan
  292. }
  293. }
  294. clusterId := c.Participant(0).node.ClusterId()
  295. for i := 0; i < size; i++ {
  296. if g := c.Participant(i).node.ClusterId(); g != clusterId {
  297. panic(fmt.Sprintf("#%d: clusterId = %x, want %x", i, g, clusterId))
  298. }
  299. }
  300. }
  301. func (c *testCluster) Node(i int) *testServer {
  302. return c.nodes[i]
  303. }
  304. func (c *testCluster) Participant(i int) *participant {
  305. return c.Node(i).Participant()
  306. }
  307. func (c *testCluster) Standby(i int) *standby {
  308. return c.Node(i).Standby()
  309. }
  310. func (c *testCluster) URL(i int) string {
  311. return c.nodes[i].h.URL
  312. }
  313. func (c *testCluster) Restart() {
  314. for _, s := range c.nodes {
  315. s.Start()
  316. }
  317. }
  318. func (c *testCluster) Stop() {
  319. for _, s := range c.nodes {
  320. s.Stop()
  321. }
  322. }
  323. func (c *testCluster) Destroy() {
  324. for _, s := range c.nodes {
  325. s.Destroy()
  326. }
  327. }
  328. func (c *testCluster) Leader() (lead, term int64) {
  329. for {
  330. ls := make([]leadterm, 0, c.Size)
  331. for i := range c.nodes {
  332. switch c.Node(i).e.mode.Get() {
  333. case participantMode:
  334. ls = append(ls, c.Node(i).Lead())
  335. case standbyMode:
  336. //TODO(xiangli) add standby support
  337. case stopMode:
  338. }
  339. }
  340. if isSameLead(ls) {
  341. return ls[0].lead, ls[0].term
  342. }
  343. time.Sleep(c.Node(0).e.tickDuration * defaultElection)
  344. }
  345. }
  346. type leadterm struct {
  347. lead int64
  348. term int64
  349. }
  350. func isSameLead(ls []leadterm) bool {
  351. m := make(map[leadterm]int)
  352. for i := range ls {
  353. m[ls[i]] = m[ls[i]] + 1
  354. }
  355. if len(m) == 1 {
  356. if ls[0].lead == -1 {
  357. return false
  358. }
  359. return true
  360. }
  361. // todo(xiangli): printout the current cluster status for debugging....
  362. return false
  363. }
  364. type testServer struct {
  365. Config *conf.Config
  366. Id int64
  367. TLS bool
  368. // base URL of form http://ipaddr:port with no trailing slash
  369. URL string
  370. e *Server
  371. h *httptest.Server
  372. }
  373. func (s *testServer) Start() {
  374. if s.Config == nil {
  375. s.Config = newTestConfig()
  376. }
  377. c := s.Config
  378. if !strings.HasPrefix(c.DataDir, os.TempDir()) {
  379. panic("dataDir may pollute file system")
  380. }
  381. if c.Peer.CAFile != "" || c.Peer.CertFile != "" || c.Peer.KeyFile != "" {
  382. panic("use TLS field instead")
  383. }
  384. nc := *c
  385. e, err := New(&nc)
  386. if err != nil {
  387. panic(err)
  388. }
  389. s.e = e
  390. e.setId(s.Id)
  391. tick := time.Duration(c.Peer.HeartbeatInterval) * time.Millisecond
  392. e.SetTick(tick)
  393. m := http.NewServeMux()
  394. m.Handle("/", e)
  395. m.Handle("/raft", e.RaftHandler())
  396. m.Handle("/raft/", e.RaftHandler())
  397. m.Handle("/v2/admin/", e.RaftHandler())
  398. addr := c.Addr
  399. if s.URL != "" {
  400. addr = urlHost(s.URL)
  401. }
  402. s.h = startServingAddr(addr, m, s.TLS)
  403. s.URL = s.h.URL
  404. e.pubAddr = s.URL
  405. e.raftPubAddr = s.URL
  406. e.cfg.Addr = s.URL
  407. e.cfg.Peer.Addr = s.URL
  408. go e.Run()
  409. }
  410. func (s *testServer) WaitMode(mode int64) {
  411. for i := 0; i < 30; i++ {
  412. if s.e.mode.Get() == mode {
  413. return
  414. }
  415. time.Sleep(time.Millisecond)
  416. }
  417. panic("waitMode should never take more than 30ms.")
  418. }
  419. func (s *testServer) Participant() *participant {
  420. if s.e.mode.Get() != participantMode {
  421. return nil
  422. }
  423. return s.e.p
  424. }
  425. func (s *testServer) Standby() *standby {
  426. return s.e.s
  427. }
  428. func (s *testServer) Lead() leadterm {
  429. return leadterm{s.Participant().node.Leader(), s.Participant().node.Term()}
  430. }
  431. func (s *testServer) Stop() error {
  432. err := s.e.Stop()
  433. s.h.Close()
  434. return err
  435. }
  436. func (s *testServer) Destroy() error {
  437. err := s.Stop()
  438. if err := os.RemoveAll(s.Config.DataDir); err != nil {
  439. panic(err)
  440. }
  441. return err
  442. }
  443. func startServingAddr(addr string, h http.Handler, tls bool) *httptest.Server {
  444. var l net.Listener
  445. var err error
  446. for i := 0; i < 4; i++ {
  447. l, err = net.Listen("tcp", addr)
  448. if err == nil {
  449. break
  450. }
  451. if !strings.Contains(err.Error(), "address already in use") {
  452. panic(err)
  453. }
  454. time.Sleep(500 * time.Millisecond)
  455. }
  456. if l == nil {
  457. panic("cannot listen on " + addr)
  458. }
  459. hs := &httptest.Server{
  460. Listener: l,
  461. Config: &http.Server{Handler: h},
  462. }
  463. if tls {
  464. hs.StartTLS()
  465. } else {
  466. hs.Start()
  467. }
  468. return hs
  469. }
  470. func newTestConfig() *conf.Config {
  471. c := conf.New()
  472. c.Addr = "127.0.0.1:0"
  473. c.Peer.Addr = "127.0.0.1:0"
  474. c.Peer.HeartbeatInterval = 5
  475. c.Peer.ElectionTimeout = 25
  476. c.RetryInterval = 1 / 10.0
  477. dataDir, err := ioutil.TempDir(os.TempDir(), "etcd")
  478. if err != nil {
  479. panic(err)
  480. }
  481. c.DataDir = dataDir
  482. return c
  483. }
  484. func urlHost(urlStr string) string {
  485. u, err := url.Parse(urlStr)
  486. if err != nil {
  487. panic(err)
  488. }
  489. return u.Host
  490. }