etcd_test.go 14 KB


  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "fmt"
  16. "io/ioutil"
  17. "net"
  18. "net/http"
  19. "net/http/httptest"
  20. "net/url"
  21. "os"
  22. "path"
  23. "reflect"
  24. "strings"
  25. "testing"
  26. "time"
  27. "github.com/coreos/etcd/conf"
  28. "github.com/coreos/etcd/store"
  29. )
  30. func TestMultipleNodes(t *testing.T) {
  31. defer afterTest(t)
  32. tests := []int{1, 3, 5, 9, 11}
  33. for _, tt := range tests {
  34. c := &testCluster{Size: tt}
  35. c.Start()
  36. c.Destroy()
  37. }
  38. }
  39. func TestMultipleTLSNodes(t *testing.T) {
  40. defer afterTest(t)
  41. tests := []int{1, 3, 5}
  42. for _, tt := range tests {
  43. c := &testCluster{Size: tt, TLS: true}
  44. c.Start()
  45. c.Destroy()
  46. }
  47. }
  48. func TestV2Redirect(t *testing.T) {
  49. defer afterTest(t)
  50. c := &testCluster{Size: 3}
  51. c.Start()
  52. defer c.Destroy()
  53. u := c.URL(1)
  54. ru := fmt.Sprintf("%s%s", c.URL(0), "/v2/keys/foo")
  55. tc := NewTestClient()
  56. v := url.Values{}
  57. v.Set("value", "XXX")
  58. resp, _ := tc.PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v)
  59. if resp.StatusCode != http.StatusTemporaryRedirect {
  60. t.Errorf("status = %d, want %d", resp.StatusCode, http.StatusTemporaryRedirect)
  61. }
  62. location, err := resp.Location()
  63. if err != nil {
  64. t.Errorf("want err = %, want nil", err)
  65. }
  66. if location.String() != ru {
  67. t.Errorf("location = %v, want %v", location.String(), ru)
  68. }
  69. resp.Body.Close()
  70. }
  71. func TestRemove(t *testing.T) {
  72. defer afterTest(t)
  73. tests := []int{3, 4, 5, 6}
  74. for aa := 0; aa < 1; aa++ {
  75. for k, tt := range tests {
  76. cl := testCluster{Size: tt}
  77. cl.Start()
  78. lead, _ := cl.Leader()
  79. config := conf.NewClusterConfig()
  80. config.ActiveSize = 0
  81. if err := cl.Participant(lead).setClusterConfig(config); err != nil {
  82. t.Fatalf("#%d: setClusterConfig err = %v", k, err)
  83. }
  84. // we don't remove the machine from 2-node cluster because it is
  85. // not 100 percent safe in our raft.
  86. // TODO(yichengq): improve it later.
  87. for i := 0; i < tt-2; i++ {
  88. id := cl.Id(i)
  89. for {
  90. n := cl.Node(i)
  91. if n.e.mode.Get() == standbyMode {
  92. break
  93. }
  94. err := n.Participant().remove(id)
  95. if err == nil {
  96. break
  97. }
  98. switch err {
  99. case tmpErr:
  100. time.Sleep(defaultElection * 5 * time.Millisecond)
  101. case raftStopErr, stopErr:
  102. default:
  103. t.Fatal(err)
  104. }
  105. }
  106. cl.Node(i).WaitMode(standbyMode)
  107. }
  108. cl.Destroy()
  109. }
  110. }
  111. }
  112. // TODO(yicheng) Add test for becoming standby
  113. // maxSize -> standby
  114. // auto-demote -> standby
  115. // remove -> standby
  116. func TestReleaseVersion(t *testing.T) {
  117. defer afterTest(t)
  118. cl := testCluster{Size: 1}
  119. cl.Start()
  120. defer cl.Destroy()
  121. resp, err := http.Get(cl.URL(0) + "/version")
  122. if err != nil {
  123. t.Fatal(err)
  124. }
  125. defer resp.Body.Close()
  126. g, err := ioutil.ReadAll(resp.Body)
  127. if err != nil {
  128. t.Error(err)
  129. }
  130. gs := string(g)
  131. w := fmt.Sprintf("etcd %s", releaseVersion)
  132. if gs != w {
  133. t.Errorf("version = %v, want %v", gs, w)
  134. }
  135. }
  136. func TestVersionCheck(t *testing.T) {
  137. defer afterTest(t)
  138. cl := testCluster{Size: 1}
  139. cl.Start()
  140. defer cl.Destroy()
  141. u := cl.URL(0)
  142. currentVersion := 2
  143. tests := []struct {
  144. version int
  145. wStatus int
  146. }{
  147. {currentVersion - 1, http.StatusForbidden},
  148. {currentVersion, http.StatusOK},
  149. {currentVersion + 1, http.StatusForbidden},
  150. }
  151. for i, tt := range tests {
  152. resp, err := http.Get(fmt.Sprintf("%s/raft/version/%d/check", u, tt.version))
  153. if err != nil {
  154. t.Fatal(err)
  155. }
  156. resp.Body.Close()
  157. if resp.StatusCode != tt.wStatus {
  158. t.Fatal("#%d: status = %d, want %d", i, resp.StatusCode, tt.wStatus)
  159. }
  160. }
  161. }
  162. func TestSingleNodeRecovery(t *testing.T) {
  163. defer afterTest(t)
  164. c := newTestConfig()
  165. ts := testServer{Config: c}
  166. ts.Start()
  167. defer ts.Destroy()
  168. ts.WaitMode(participantMode)
  169. key := "/foo"
  170. ev, err := ts.Participant().Set(key, false, "bar", time.Now().Add(time.Second*100))
  171. if err != nil {
  172. t.Fatal(err)
  173. }
  174. ts.Stop()
  175. ts = testServer{Config: c}
  176. ts.Start()
  177. ts.WaitMode(participantMode)
  178. w, err := ts.Participant().Store.Watch(key, false, false, ev.Index())
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. // give testing server time to load the previous WAL file
  183. select {
  184. case <-w.EventChan:
  185. case <-time.After(time.Second):
  186. t.Fatal("watch timeout")
  187. }
  188. }
  189. func TestTakingSnapshot(t *testing.T) {
  190. defer afterTest(t)
  191. cl := testCluster{Size: 1}
  192. cl.Start()
  193. defer cl.Destroy()
  194. // TODO(xiangli): tunable compact; reduce testing time
  195. for i := 0; i < defaultCompact; i++ {
  196. cl.Participant(0).Set("/foo", false, "bar", store.Permanent)
  197. }
  198. snap := cl.Participant(0).node.GetSnap()
  199. if snap.Index != int64(defaultCompact) {
  200. t.Errorf("snap.Index = %d, want %d", snap.Index, defaultCompact)
  201. }
  202. }
  203. func TestRestoreSnapshotFromLeader(t *testing.T) {
  204. defer afterTest(t)
  205. cl := testCluster{Size: 1}
  206. cl.Start()
  207. defer cl.Destroy()
  208. // let leader do snapshot
  209. for i := 0; i < defaultCompact; i++ {
  210. cl.Participant(0).Set(fmt.Sprint("/foo", i), false, fmt.Sprint("bar", i), store.Permanent)
  211. }
  212. // create one to join the cluster
  213. c := newTestConfig()
  214. c.Name = "1"
  215. c.Peers = []string{cl.URL(0)}
  216. ts := &testServer{Config: c}
  217. cl.Add(ts)
  218. // check new proposal could be submitted
  219. if _, err := cl.Participant(0).Set("/foo", false, "bar", store.Permanent); err != nil {
  220. t.Fatal(err)
  221. }
  222. // check store is recovered
  223. for i := 0; i < defaultCompact; i++ {
  224. ev, err := ts.Participant().Store.Get(fmt.Sprint("/foo", i), false, false)
  225. if err != nil {
  226. t.Errorf("get err = %v", err)
  227. continue
  228. }
  229. w := fmt.Sprint("bar", i)
  230. if g := *ev.Node.Value; g != w {
  231. t.Errorf("value = %v, want %v", g, w)
  232. }
  233. }
  234. // check new proposal could be committed in the new machine
  235. wch, err := ts.Participant().Watch("/foo", false, false, uint64(defaultCompact))
  236. if err != nil {
  237. t.Errorf("watch err = %v", err)
  238. }
  239. <-wch.EventChan
  240. // check node map of two machines are the same
  241. g := ts.Participant().node.Nodes()
  242. w := cl.Participant(0).node.Nodes()
  243. if !reflect.DeepEqual(g, w) {
  244. t.Errorf("nodes = %v, want %v", g, w)
  245. }
  246. }
  247. func TestSaveSnapshot(t *testing.T) {
  248. defer afterTest(t)
  249. cl := testCluster{Size: 1}
  250. cl.Start()
  251. defer cl.Destroy()
  252. n := cl.Node(0)
  253. // TODO(xiangli): tunable compact; reduce testing time
  254. for i := 0; i < defaultCompact; i++ {
  255. n.Participant().Set("/foo", false, "bar", store.Permanent)
  256. }
  257. snapname := fmt.Sprintf("%016x-%016x-%016x.snap", n.Participant().clusterId, 1, defaultCompact)
  258. snappath := path.Join(n.Config.DataDir, "snap", snapname)
  259. if _, err := os.Stat(snappath); err != nil {
  260. t.Errorf("err = %v, want nil", err)
  261. }
  262. walname := fmt.Sprintf("%016x-%016x.wal", 1, defaultCompact)
  263. walpath := path.Join(n.Config.DataDir, "wal", walname)
  264. if _, err := os.Stat(walpath); err != nil {
  265. t.Errorf("err = %v, want nil", err)
  266. }
  267. }
  268. func TestRestoreSnapshotFromDisk(t *testing.T) {
  269. defer afterTest(t)
  270. tests := []int{1, 3, 5}
  271. // TODO(xiangli): tunable compact; reduce testing time
  272. oldDefaultCompact := defaultCompact
  273. defaultCompact = 10
  274. defer func() { defaultCompact = oldDefaultCompact }()
  275. for _, tt := range tests {
  276. cl := testCluster{Size: tt}
  277. cl.Start()
  278. defer cl.Destroy()
  279. lead, _ := cl.Leader()
  280. for i := 0; i < defaultCompact+10; i++ {
  281. cl.Participant(lead).Set(fmt.Sprint("/foo", i), false, fmt.Sprint("bar", i), store.Permanent)
  282. }
  283. cl.Stop()
  284. cl.Restart()
  285. lead, _ = cl.Leader()
  286. // check store is recovered
  287. for i := 0; i < defaultCompact+10; i++ {
  288. ev, err := cl.Participant(lead).Store.Get(fmt.Sprint("/foo", i), false, false)
  289. if err != nil {
  290. t.Errorf("get err = %v", err)
  291. continue
  292. }
  293. w := fmt.Sprint("bar", i)
  294. if g := *ev.Node.Value; g != w {
  295. t.Errorf("value = %v, want %v", g, w)
  296. }
  297. }
  298. // check new proposal could be submitted
  299. if _, err := cl.Participant(lead).Set("/foo", false, "bar", store.Permanent); err != nil {
  300. t.Fatal(err)
  301. }
  302. }
  303. }
  304. type testCluster struct {
  305. Size int
  306. TLS bool
  307. nodes []*testServer
  308. }
  309. func (c *testCluster) Start() {
  310. if c.Size <= 0 {
  311. panic("cluster size <= 0")
  312. }
  313. nodes := make([]*testServer, c.Size)
  314. c.nodes = nodes
  315. cfg := newTestConfig()
  316. cfg.Name = "testServer-0"
  317. nodes[0] = &testServer{Config: cfg, TLS: c.TLS}
  318. nodes[0].Start()
  319. nodes[0].WaitMode(participantMode)
  320. seed := nodes[0].URL
  321. for i := 1; i < c.Size; i++ {
  322. cfg := newTestConfig()
  323. cfg.Name = "testServer-" + fmt.Sprint(i)
  324. cfg.Peers = []string{seed}
  325. s := &testServer{Config: cfg, TLS: c.TLS}
  326. s.Start()
  327. nodes[i] = s
  328. // Wait for the previous configuration change to be committed
  329. // or this configuration request might be dropped.
  330. // Or it could be a slow join because it needs to retry.
  331. // TODO: this might not be true if we add param for retry interval.
  332. s.WaitMode(participantMode)
  333. w, err := s.Participant().Watch(v2machineKVPrefix, true, false, uint64(i))
  334. if err != nil {
  335. panic(err)
  336. }
  337. <-w.EventChan
  338. }
  339. c.wait()
  340. }
  341. func (c *testCluster) wait() {
  342. size := c.Size
  343. for i := 0; i < size; i++ {
  344. for k := 0; k < size; k++ {
  345. s := c.Node(i)
  346. wp := v2machineKVPrefix + fmt.Sprintf("/%d", c.Id(k))
  347. w, err := s.Participant().Watch(wp, false, false, 1)
  348. if err != nil {
  349. panic(err)
  350. }
  351. <-w.EventChan
  352. }
  353. }
  354. clusterId := c.Participant(0).node.ClusterId()
  355. for i := 0; i < size; i++ {
  356. if g := c.Participant(i).node.ClusterId(); g != clusterId {
  357. panic(fmt.Sprintf("#%d: clusterId = %x, want %x", i, g, clusterId))
  358. }
  359. }
  360. }
  361. func (c *testCluster) Add(s *testServer) {
  362. lead, _ := c.Leader()
  363. // wait for the node to join the cluster
  364. // TODO(yichengq): remove this when we get rid of all timeouts
  365. wch, err := c.Participant(int(lead)).Watch(v2machineKVPrefix, true, false, 0)
  366. if err != nil {
  367. panic(err)
  368. }
  369. s.Start()
  370. <-wch.EventChan
  371. c.Size++
  372. c.nodes = append(c.nodes, s)
  373. }
  374. func (c *testCluster) Node(i int) *testServer {
  375. return c.nodes[i]
  376. }
  377. func (c *testCluster) Participant(i int) *participant {
  378. return c.Node(i).Participant()
  379. }
  380. func (c *testCluster) Standby(i int) *standby {
  381. return c.Node(i).Standby()
  382. }
  383. func (c *testCluster) URL(i int) string {
  384. return c.nodes[i].h.URL
  385. }
  386. func (c *testCluster) Id(i int) int64 {
  387. return c.Participant(i).id
  388. }
  389. func (c *testCluster) Restart() {
  390. for _, s := range c.nodes {
  391. s.Start()
  392. }
  393. }
  394. func (c *testCluster) Stop() {
  395. for _, s := range c.nodes {
  396. s.Stop()
  397. }
  398. }
  399. func (c *testCluster) Destroy() {
  400. for _, s := range c.nodes {
  401. s.Destroy()
  402. }
  403. }
  404. // Leader returns the index of leader in the cluster and its leader term.
  405. func (c *testCluster) Leader() (leadIdx int, term int64) {
  406. ids := make(map[int64]int)
  407. for {
  408. ls := make([]leadterm, 0, c.Size)
  409. for i := range c.nodes {
  410. switch c.Node(i).e.mode.Get() {
  411. case participantMode:
  412. ls = append(ls, c.Node(i).Lead())
  413. ids[c.Id(i)] = i
  414. case standbyMode:
  415. //TODO(xiangli) add standby support
  416. case stopMode:
  417. }
  418. }
  419. if isSameLead(ls) {
  420. return ids[ls[0].lead], ls[0].term
  421. }
  422. time.Sleep(c.Node(0).e.tickDuration * defaultElection)
  423. }
  424. }
  425. type leadterm struct {
  426. lead int64
  427. term int64
  428. }
  429. func isSameLead(ls []leadterm) bool {
  430. m := make(map[leadterm]int)
  431. for i := range ls {
  432. m[ls[i]] = m[ls[i]] + 1
  433. }
  434. if len(m) == 1 {
  435. if ls[0].lead == -1 {
  436. return false
  437. }
  438. return true
  439. }
  440. // todo(xiangli): printout the current cluster status for debugging....
  441. return false
  442. }
  443. type testServer struct {
  444. Config *conf.Config
  445. TLS bool
  446. // base URL of form http://ipaddr:port with no trailing slash
  447. URL string
  448. e *Server
  449. h *httptest.Server
  450. }
  451. func (s *testServer) Start() {
  452. if s.Config == nil {
  453. s.Config = newTestConfig()
  454. }
  455. c := s.Config
  456. if !strings.HasPrefix(c.DataDir, os.TempDir()) {
  457. panic("dataDir may pollute file system")
  458. }
  459. if c.Peer.CAFile != "" || c.Peer.CertFile != "" || c.Peer.KeyFile != "" {
  460. panic("use TLS field instead")
  461. }
  462. nc := *c
  463. e, err := New(&nc)
  464. if err != nil {
  465. panic(err)
  466. }
  467. s.e = e
  468. tick := time.Duration(c.Peer.HeartbeatInterval) * time.Millisecond
  469. e.SetTick(tick)
  470. m := http.NewServeMux()
  471. m.Handle("/", e)
  472. m.Handle("/raft", e.RaftHandler())
  473. m.Handle("/raft/", e.RaftHandler())
  474. m.Handle("/v2/admin/", e.RaftHandler())
  475. addr := c.Addr
  476. if s.URL != "" {
  477. addr = urlHost(s.URL)
  478. }
  479. s.h = startServingAddr(addr, m, s.TLS)
  480. s.URL = s.h.URL
  481. e.pubAddr = s.URL
  482. e.raftPubAddr = s.URL
  483. e.cfg.Addr = s.URL
  484. e.cfg.Peer.Addr = s.URL
  485. go e.Run()
  486. }
  487. func (s *testServer) WaitMode(mode int64) {
  488. for i := 0; i < 30; i++ {
  489. if s.e.mode.Get() == mode {
  490. return
  491. }
  492. time.Sleep(time.Millisecond)
  493. }
  494. panic("waitMode should never take more than 30ms.")
  495. }
  496. func (s *testServer) Participant() *participant {
  497. if s.e.mode.Get() != participantMode {
  498. return nil
  499. }
  500. return s.e.p
  501. }
  502. func (s *testServer) Standby() *standby {
  503. return s.e.s
  504. }
  505. func (s *testServer) Lead() leadterm {
  506. return leadterm{s.Participant().node.Leader(), s.Participant().node.Term()}
  507. }
  508. func (s *testServer) Stop() error {
  509. err := s.e.Stop()
  510. s.h.Close()
  511. return err
  512. }
  513. func (s *testServer) Destroy() error {
  514. err := s.Stop()
  515. if err := os.RemoveAll(s.Config.DataDir); err != nil {
  516. panic(err)
  517. }
  518. return err
  519. }
  520. func startServingAddr(addr string, h http.Handler, tls bool) *httptest.Server {
  521. var l net.Listener
  522. var err error
  523. for i := 0; i < 4; i++ {
  524. l, err = net.Listen("tcp", addr)
  525. if err == nil {
  526. break
  527. }
  528. if !strings.Contains(err.Error(), "address already in use") {
  529. panic(err)
  530. }
  531. time.Sleep(500 * time.Millisecond)
  532. }
  533. if l == nil {
  534. panic("cannot listen on " + addr)
  535. }
  536. hs := &httptest.Server{
  537. Listener: l,
  538. Config: &http.Server{Handler: h},
  539. }
  540. if tls {
  541. hs.StartTLS()
  542. } else {
  543. hs.Start()
  544. }
  545. return hs
  546. }
  547. func newTestConfig() *conf.Config {
  548. c := conf.New()
  549. c.Addr = "127.0.0.1:0"
  550. c.Peer.Addr = "127.0.0.1:0"
  551. c.Peer.HeartbeatInterval = 5
  552. c.Peer.ElectionTimeout = 25
  553. c.RetryInterval = 1 / 10.0
  554. dataDir, err := ioutil.TempDir(os.TempDir(), "etcd")
  555. if err != nil {
  556. panic(err)
  557. }
  558. c.DataDir = dataDir
  559. return c
  560. }
  561. func urlHost(urlStr string) string {
  562. u, err := url.Parse(urlStr)
  563. if err != nil {
  564. panic(err)
  565. }
  566. return u.Host
  567. }