cluster_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package integration
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "log"
  19. "math/rand"
  20. "net"
  21. "net/http"
  22. "net/http/httptest"
  23. "os"
  24. "reflect"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync/atomic"
  29. "testing"
  30. "time"
  31. "github.com/coreos/etcd/client"
  32. "github.com/coreos/etcd/etcdserver"
  33. "github.com/coreos/etcd/etcdserver/etcdhttp"
  34. "github.com/coreos/etcd/pkg/testutil"
  35. "github.com/coreos/etcd/pkg/transport"
  36. "github.com/coreos/etcd/pkg/types"
  37. "github.com/coreos/etcd/rafthttp"
  38. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  39. )
  40. const (
  41. tickDuration = 10 * time.Millisecond
  42. clusterName = "etcd"
  43. requestTimeout = 2 * time.Second
  44. )
  45. var (
  46. electionTicks = 10
  47. // integration test uses well-known ports to listen for each running member,
  48. // which ensures restarted member could listen on specific port again.
  49. nextListenPort int64 = 20000
  50. )
  51. func init() {
  52. // open microsecond-level time log for integration test debugging
  53. log.SetFlags(log.Ltime | log.Lmicroseconds | log.Lshortfile)
  54. if t := os.Getenv("ETCD_ELECTION_TIMEOUT_TICKS"); t != "" {
  55. if i, err := strconv.ParseInt(t, 10, 64); err == nil {
  56. electionTicks = int(i)
  57. }
  58. }
  59. }
  60. func TestClusterOf1(t *testing.T) { testCluster(t, 1) }
  61. func TestClusterOf3(t *testing.T) { testCluster(t, 3) }
  62. func testCluster(t *testing.T, size int) {
  63. defer afterTest(t)
  64. c := NewCluster(t, size)
  65. c.Launch(t)
  66. defer c.Terminate(t)
  67. clusterMustProgress(t, c.Members)
  68. }
  69. func TestTLSClusterOf3(t *testing.T) {
  70. defer afterTest(t)
  71. c := NewTLSCluster(t, 3)
  72. c.Launch(t)
  73. defer c.Terminate(t)
  74. clusterMustProgress(t, c.Members)
  75. }
  76. func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) }
  77. func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3) }
  78. func testClusterUsingDiscovery(t *testing.T, size int) {
  79. defer afterTest(t)
  80. dc := NewCluster(t, 1)
  81. dc.Launch(t)
  82. defer dc.Terminate(t)
  83. // init discovery token space
  84. dcc := mustNewHTTPClient(t, dc.URLs())
  85. dkapi := client.NewKeysAPI(dcc)
  86. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  87. if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
  88. t.Fatal(err)
  89. }
  90. cancel()
  91. c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys")
  92. c.Launch(t)
  93. defer c.Terminate(t)
  94. clusterMustProgress(t, c.Members)
  95. }
  96. func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
  97. defer afterTest(t)
  98. dc := NewCluster(t, 1)
  99. dc.Launch(t)
  100. defer dc.Terminate(t)
  101. // init discovery token space
  102. dcc := mustNewHTTPClient(t, dc.URLs())
  103. dkapi := client.NewKeysAPI(dcc)
  104. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  105. if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
  106. t.Fatal(err)
  107. }
  108. cancel()
  109. c := NewTLSClusterByDiscovery(t, 3, dc.URL(0)+"/v2/keys")
  110. c.Launch(t)
  111. defer c.Terminate(t)
  112. clusterMustProgress(t, c.Members)
  113. }
  114. func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) }
  115. func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) }
  116. func testDoubleClusterSize(t *testing.T, size int) {
  117. defer afterTest(t)
  118. c := NewCluster(t, size)
  119. c.Launch(t)
  120. defer c.Terminate(t)
  121. for i := 0; i < size; i++ {
  122. c.AddMember(t)
  123. }
  124. clusterMustProgress(t, c.Members)
  125. }
  126. func TestDoubleTLSClusterSizeOf3(t *testing.T) {
  127. defer afterTest(t)
  128. c := NewTLSCluster(t, 3)
  129. c.Launch(t)
  130. defer c.Terminate(t)
  131. for i := 0; i < 3; i++ {
  132. c.AddTLSMember(t)
  133. }
  134. clusterMustProgress(t, c.Members)
  135. }
  136. func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) }
  137. func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) }
  138. func testDecreaseClusterSize(t *testing.T, size int) {
  139. defer afterTest(t)
  140. c := NewCluster(t, size)
  141. c.Launch(t)
  142. defer c.Terminate(t)
  143. // TODO: remove the last but one member
  144. for i := 0; i < size-1; i++ {
  145. id := c.Members[len(c.Members)-1].s.ID()
  146. c.RemoveMember(t, uint64(id))
  147. c.waitLeader(t, c.Members)
  148. }
  149. clusterMustProgress(t, c.Members)
  150. }
  151. func TestForceNewCluster(t *testing.T) {
  152. c := NewCluster(t, 3)
  153. c.Launch(t)
  154. cc := mustNewHTTPClient(t, []string{c.Members[0].URL()})
  155. kapi := client.NewKeysAPI(cc)
  156. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  157. resp, err := kapi.Create(ctx, "/foo", "bar")
  158. if err != nil {
  159. t.Fatalf("unexpected create error: %v", err)
  160. }
  161. cancel()
  162. // ensure create has been applied in this machine
  163. ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
  164. if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
  165. t.Fatalf("unexpected watch error: %v", err)
  166. }
  167. cancel()
  168. c.Members[0].Stop(t)
  169. c.Members[1].Terminate(t)
  170. c.Members[2].Terminate(t)
  171. c.Members[0].ForceNewCluster = true
  172. err = c.Members[0].Restart(t)
  173. if err != nil {
  174. t.Fatalf("unexpected ForceRestart error: %v", err)
  175. }
  176. defer c.Members[0].Terminate(t)
  177. c.waitLeader(t, c.Members[:1])
  178. // use new http client to init new connection
  179. cc = mustNewHTTPClient(t, []string{c.Members[0].URL()})
  180. kapi = client.NewKeysAPI(cc)
  181. // ensure force restart keep the old data, and new cluster can make progress
  182. ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
  183. if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
  184. t.Fatalf("unexpected watch error: %v", err)
  185. }
  186. cancel()
  187. clusterMustProgress(t, c.Members[:1])
  188. }
  189. func TestAddMemberAfterClusterFullRotation(t *testing.T) {
  190. defer afterTest(t)
  191. c := NewCluster(t, 3)
  192. c.Launch(t)
  193. defer c.Terminate(t)
  194. // remove all the previous three members and add in three new members.
  195. for i := 0; i < 3; i++ {
  196. c.RemoveMember(t, uint64(c.Members[0].s.ID()))
  197. c.waitLeader(t, c.Members)
  198. c.AddMember(t)
  199. c.waitLeader(t, c.Members)
  200. }
  201. c.AddMember(t)
  202. c.waitLeader(t, c.Members)
  203. clusterMustProgress(t, c.Members)
  204. }
  205. // Ensure we can remove a member then add a new one back immediately.
  206. func TestIssue2681(t *testing.T) {
  207. defer afterTest(t)
  208. c := NewCluster(t, 5)
  209. c.Launch(t)
  210. defer c.Terminate(t)
  211. c.RemoveMember(t, uint64(c.Members[4].s.ID()))
  212. c.waitLeader(t, c.Members)
  213. c.AddMember(t)
  214. c.waitLeader(t, c.Members)
  215. clusterMustProgress(t, c.Members)
  216. }
  217. // Ensure we can remove a member after a snapshot then add a new one back.
  218. func TestIssue2746(t *testing.T) {
  219. defer afterTest(t)
  220. c := NewCluster(t, 5)
  221. for _, m := range c.Members {
  222. m.SnapCount = 10
  223. }
  224. c.Launch(t)
  225. defer c.Terminate(t)
  226. // force a snapshot
  227. for i := 0; i < 20; i++ {
  228. clusterMustProgress(t, c.Members)
  229. }
  230. c.RemoveMember(t, uint64(c.Members[4].s.ID()))
  231. c.waitLeader(t, c.Members)
  232. c.AddMember(t)
  233. c.waitLeader(t, c.Members)
  234. clusterMustProgress(t, c.Members)
  235. }
  236. // Ensure etcd will not panic when removing a just started member.
  237. func TestIssue2904(t *testing.T) {
  238. defer afterTest(t)
  239. // start 1-member cluster to ensure member 0 is the leader of the cluster.
  240. c := NewCluster(t, 1)
  241. c.Launch(t)
  242. defer c.Terminate(t)
  243. c.AddMember(t)
  244. c.Members[1].Stop(t)
  245. // send remove member-1 request to the cluster.
  246. cc := mustNewHTTPClient(t, c.URLs())
  247. ma := client.NewMembersAPI(cc)
  248. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  249. // the proposal is not committed because member 1 is stopped, but the
  250. // proposal is appended to leader's raft log.
  251. ma.Remove(ctx, c.Members[1].s.ID().String())
  252. cancel()
  253. // restart member, and expect it to send updateAttr request.
  254. // the log in the leader is like this:
  255. // [..., remove 1, ..., update attr 1, ...]
  256. c.Members[1].Restart(t)
  257. // when the member comes back, it ack the proposal to remove itself,
  258. // and apply it.
  259. <-c.Members[1].s.StopNotify()
  260. // terminate removed member
  261. c.Members[1].Terminate(t)
  262. c.Members = c.Members[:1]
  263. // wait member to be removed.
  264. c.waitMembersMatch(t, c.HTTPMembers())
  265. }
  266. // clusterMustProgress ensures that cluster can make progress. It creates
  267. // a random key first, and check the new key could be got from all client urls
  268. // of the cluster.
  269. func clusterMustProgress(t *testing.T, membs []*member) {
  270. cc := mustNewHTTPClient(t, []string{membs[0].URL()})
  271. kapi := client.NewKeysAPI(cc)
  272. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  273. key := fmt.Sprintf("foo%d", rand.Int())
  274. resp, err := kapi.Create(ctx, "/"+key, "bar")
  275. if err != nil {
  276. t.Fatalf("create on %s error: %v", membs[0].URL(), err)
  277. }
  278. cancel()
  279. for i, m := range membs {
  280. u := m.URL()
  281. mcc := mustNewHTTPClient(t, []string{u})
  282. mkapi := client.NewKeysAPI(mcc)
  283. mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
  284. if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
  285. t.Fatalf("#%d: watch on %s error: %v", i, u, err)
  286. }
  287. mcancel()
  288. }
  289. }
  290. // TODO: support TLS
  291. type cluster struct {
  292. Members []*member
  293. }
  294. func fillClusterForMembers(ms []*member) error {
  295. addrs := make([]string, 0)
  296. for _, m := range ms {
  297. scheme := "http"
  298. if !m.PeerTLSInfo.Empty() {
  299. scheme = "https"
  300. }
  301. for _, l := range m.PeerListeners {
  302. addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
  303. }
  304. }
  305. clusterStr := strings.Join(addrs, ",")
  306. var err error
  307. for _, m := range ms {
  308. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  309. if err != nil {
  310. return err
  311. }
  312. }
  313. return nil
  314. }
  315. func newCluster(t *testing.T, size int, usePeerTLS bool) *cluster {
  316. c := &cluster{}
  317. ms := make([]*member, size)
  318. for i := 0; i < size; i++ {
  319. ms[i] = mustNewMember(t, c.name(i), usePeerTLS)
  320. }
  321. c.Members = ms
  322. if err := fillClusterForMembers(c.Members); err != nil {
  323. t.Fatal(err)
  324. }
  325. return c
  326. }
  327. func newClusterByDiscovery(t *testing.T, size int, usePeerTLS bool, url string) *cluster {
  328. c := &cluster{}
  329. ms := make([]*member, size)
  330. for i := 0; i < size; i++ {
  331. ms[i] = mustNewMember(t, c.name(i), usePeerTLS)
  332. ms[i].DiscoveryURL = url
  333. }
  334. c.Members = ms
  335. return c
  336. }
  337. // NewCluster returns an unlaunched cluster of the given size which has been
  338. // set to use static bootstrap.
  339. func NewCluster(t *testing.T, size int) *cluster {
  340. return newCluster(t, size, false)
  341. }
  342. // NewClusterUsingDiscovery returns an unlaunched cluster of the given size
  343. // which has been set to use the given url as discovery service to bootstrap.
  344. func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster {
  345. return newClusterByDiscovery(t, size, false, url)
  346. }
  347. func NewTLSCluster(t *testing.T, size int) *cluster {
  348. return newCluster(t, size, true)
  349. }
  350. func NewTLSClusterByDiscovery(t *testing.T, size int, url string) *cluster {
  351. return newClusterByDiscovery(t, size, true, url)
  352. }
  353. func (c *cluster) Launch(t *testing.T) {
  354. errc := make(chan error)
  355. for _, m := range c.Members {
  356. // Members are launched in separate goroutines because if they boot
  357. // using discovery url, they have to wait for others to register to continue.
  358. go func(m *member) {
  359. errc <- m.Launch()
  360. }(m)
  361. }
  362. for _ = range c.Members {
  363. if err := <-errc; err != nil {
  364. t.Fatalf("error setting up member: %v", err)
  365. }
  366. }
  367. // wait cluster to be stable to receive future client requests
  368. c.waitMembersMatch(t, c.HTTPMembers())
  369. c.waitVersion()
  370. }
  371. func (c *cluster) URL(i int) string {
  372. return c.Members[i].ClientURLs[0].String()
  373. }
  374. func (c *cluster) URLs() []string {
  375. urls := make([]string, 0)
  376. for _, m := range c.Members {
  377. for _, u := range m.ClientURLs {
  378. urls = append(urls, u.String())
  379. }
  380. }
  381. return urls
  382. }
  383. func (c *cluster) HTTPMembers() []client.Member {
  384. ms := make([]client.Member, len(c.Members))
  385. for i, m := range c.Members {
  386. scheme := "http"
  387. if !m.PeerTLSInfo.Empty() {
  388. scheme = "https"
  389. }
  390. ms[i].Name = m.Name
  391. for _, ln := range m.PeerListeners {
  392. ms[i].PeerURLs = append(ms[i].PeerURLs, scheme+"://"+ln.Addr().String())
  393. }
  394. for _, ln := range m.ClientListeners {
  395. ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String())
  396. }
  397. }
  398. return ms
  399. }
  400. func (c *cluster) addMember(t *testing.T, usePeerTLS bool) {
  401. m := mustNewMember(t, c.name(rand.Int()), usePeerTLS)
  402. scheme := "http"
  403. if usePeerTLS {
  404. scheme = "https"
  405. }
  406. // send add request to the cluster
  407. cc := mustNewHTTPClient(t, []string{c.URL(0)})
  408. ma := client.NewMembersAPI(cc)
  409. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  410. peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
  411. if _, err := ma.Add(ctx, peerURL); err != nil {
  412. t.Fatalf("add member on %s error: %v", c.URL(0), err)
  413. }
  414. cancel()
  415. // wait for the add node entry applied in the cluster
  416. members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
  417. c.waitMembersMatch(t, members)
  418. m.InitialPeerURLsMap = types.URLsMap{}
  419. for _, mm := range c.Members {
  420. m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
  421. }
  422. m.InitialPeerURLsMap[m.Name] = m.PeerURLs
  423. m.NewCluster = false
  424. if err := m.Launch(); err != nil {
  425. t.Fatal(err)
  426. }
  427. c.Members = append(c.Members, m)
  428. // wait cluster to be stable to receive future client requests
  429. c.waitMembersMatch(t, c.HTTPMembers())
  430. }
  431. func (c *cluster) AddMember(t *testing.T) {
  432. c.addMember(t, false)
  433. }
  434. func (c *cluster) AddTLSMember(t *testing.T) {
  435. c.addMember(t, true)
  436. }
  437. func (c *cluster) RemoveMember(t *testing.T, id uint64) {
  438. // send remove request to the cluster
  439. cc := mustNewHTTPClient(t, c.URLs())
  440. ma := client.NewMembersAPI(cc)
  441. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  442. if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
  443. t.Fatalf("unexpected remove error %v", err)
  444. }
  445. cancel()
  446. newMembers := make([]*member, 0)
  447. for _, m := range c.Members {
  448. if uint64(m.s.ID()) != id {
  449. newMembers = append(newMembers, m)
  450. } else {
  451. select {
  452. case <-m.s.StopNotify():
  453. m.Terminate(t)
  454. // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
  455. // TODO: remove connection write timeout by selecting on http response closeNotifier
  456. // blocking on https://github.com/golang/go/issues/9524
  457. case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
  458. t.Fatalf("failed to remove member %s in time", m.s.ID())
  459. }
  460. }
  461. }
  462. c.Members = newMembers
  463. c.waitMembersMatch(t, c.HTTPMembers())
  464. }
  465. func (c *cluster) Terminate(t *testing.T) {
  466. for _, m := range c.Members {
  467. m.Terminate(t)
  468. }
  469. }
  470. func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
  471. for _, u := range c.URLs() {
  472. cc := mustNewHTTPClient(t, []string{u})
  473. ma := client.NewMembersAPI(cc)
  474. for {
  475. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  476. ms, err := ma.List(ctx)
  477. cancel()
  478. if err == nil && isMembersEqual(ms, membs) {
  479. break
  480. }
  481. time.Sleep(tickDuration)
  482. }
  483. }
  484. return
  485. }
  486. func (c *cluster) waitLeader(t *testing.T, membs []*member) {
  487. possibleLead := make(map[uint64]bool)
  488. var lead uint64
  489. for _, m := range membs {
  490. possibleLead[uint64(m.s.ID())] = true
  491. }
  492. for lead == 0 || !possibleLead[lead] {
  493. lead = 0
  494. for _, m := range membs {
  495. if lead != 0 && lead != m.s.Lead() {
  496. lead = 0
  497. break
  498. }
  499. lead = m.s.Lead()
  500. }
  501. time.Sleep(10 * tickDuration)
  502. }
  503. }
  504. func (c *cluster) waitVersion() {
  505. for _, m := range c.Members {
  506. for {
  507. if m.s.ClusterVersion() != nil {
  508. break
  509. }
  510. time.Sleep(tickDuration)
  511. }
  512. }
  513. }
  514. func (c *cluster) name(i int) string {
  515. return fmt.Sprint("node", i)
  516. }
  517. // isMembersEqual checks whether two members equal except ID field.
  518. // The given wmembs should always set ID field to empty string.
  519. func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
  520. sort.Sort(SortableMemberSliceByPeerURLs(membs))
  521. sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
  522. for i := range membs {
  523. membs[i].ID = ""
  524. }
  525. return reflect.DeepEqual(membs, wmembs)
  526. }
  527. func newLocalListener(t *testing.T) net.Listener {
  528. port := atomic.AddInt64(&nextListenPort, 1)
  529. l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(port, 10))
  530. if err != nil {
  531. t.Fatal(err)
  532. }
  533. return l
  534. }
  535. func newListenerWithAddr(t *testing.T, addr string) net.Listener {
  536. var err error
  537. var l net.Listener
  538. // TODO: we want to reuse a previous closed port immediately.
  539. // a better way is to set SO_REUSExx instead of doing retry.
  540. for i := 0; i < 5; i++ {
  541. l, err = net.Listen("tcp", addr)
  542. if err == nil {
  543. break
  544. }
  545. time.Sleep(500 * time.Millisecond)
  546. }
  547. if err != nil {
  548. t.Fatal(err)
  549. }
  550. return l
  551. }
  552. type member struct {
  553. etcdserver.ServerConfig
  554. PeerListeners, ClientListeners []net.Listener
  555. // inited PeerTLSInfo implies to enable peer TLS
  556. PeerTLSInfo transport.TLSInfo
  557. raftHandler *testutil.PauseableHandler
  558. s *etcdserver.EtcdServer
  559. hss []*httptest.Server
  560. }
  561. // mustNewMember return an inited member with the given name. If usePeerTLS is
  562. // true, it will set PeerTLSInfo and use https scheme to communicate between
  563. // peers.
  564. func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
  565. var (
  566. testTLSInfo = transport.TLSInfo{
  567. KeyFile: "./fixtures/server.key.insecure",
  568. CertFile: "./fixtures/server.crt",
  569. TrustedCAFile: "./fixtures/ca.crt",
  570. ClientCertAuth: true,
  571. }
  572. err error
  573. )
  574. m := &member{}
  575. peerScheme := "http"
  576. if usePeerTLS {
  577. peerScheme = "https"
  578. }
  579. pln := newLocalListener(t)
  580. m.PeerListeners = []net.Listener{pln}
  581. m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
  582. if err != nil {
  583. t.Fatal(err)
  584. }
  585. if usePeerTLS {
  586. m.PeerTLSInfo = testTLSInfo
  587. }
  588. cln := newLocalListener(t)
  589. m.ClientListeners = []net.Listener{cln}
  590. m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
  591. if err != nil {
  592. t.Fatal(err)
  593. }
  594. m.Name = name
  595. m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
  596. if err != nil {
  597. t.Fatal(err)
  598. }
  599. clusterStr := fmt.Sprintf("%s=%s://%s", name, peerScheme, pln.Addr().String())
  600. m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  601. if err != nil {
  602. t.Fatal(err)
  603. }
  604. m.InitialClusterToken = clusterName
  605. m.NewCluster = true
  606. m.Transport = mustNewTransport(t, m.PeerTLSInfo)
  607. m.ElectionTicks = electionTicks
  608. m.TickMs = uint(tickDuration / time.Millisecond)
  609. return m
  610. }
  611. // Clone returns a member with the same server configuration. The returned
  612. // member will not set PeerListeners and ClientListeners.
  613. func (m *member) Clone(t *testing.T) *member {
  614. mm := &member{}
  615. mm.ServerConfig = m.ServerConfig
  616. var err error
  617. clientURLStrs := m.ClientURLs.StringSlice()
  618. mm.ClientURLs, err = types.NewURLs(clientURLStrs)
  619. if err != nil {
  620. // this should never fail
  621. panic(err)
  622. }
  623. peerURLStrs := m.PeerURLs.StringSlice()
  624. mm.PeerURLs, err = types.NewURLs(peerURLStrs)
  625. if err != nil {
  626. // this should never fail
  627. panic(err)
  628. }
  629. clusterStr := m.InitialPeerURLsMap.String()
  630. mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
  631. if err != nil {
  632. // this should never fail
  633. panic(err)
  634. }
  635. mm.InitialClusterToken = m.InitialClusterToken
  636. mm.Transport = mustNewTransport(t, m.PeerTLSInfo)
  637. mm.ElectionTicks = m.ElectionTicks
  638. mm.PeerTLSInfo = m.PeerTLSInfo
  639. return mm
  640. }
  641. // Launch starts a member based on ServerConfig, PeerListeners
  642. // and ClientListeners.
  643. func (m *member) Launch() error {
  644. var err error
  645. if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
  646. return fmt.Errorf("failed to initialize the etcd server: %v", err)
  647. }
  648. m.s.SyncTicker = time.Tick(500 * time.Millisecond)
  649. m.s.Start()
  650. m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster(), m.s.RaftHandler())}
  651. for _, ln := range m.PeerListeners {
  652. hs := &httptest.Server{
  653. Listener: ln,
  654. Config: &http.Server{Handler: m.raftHandler},
  655. }
  656. if m.PeerTLSInfo.Empty() {
  657. hs.Start()
  658. } else {
  659. hs.TLS, err = m.PeerTLSInfo.ServerConfig()
  660. if err != nil {
  661. return err
  662. }
  663. hs.StartTLS()
  664. }
  665. m.hss = append(m.hss, hs)
  666. }
  667. for _, ln := range m.ClientListeners {
  668. hs := &httptest.Server{
  669. Listener: ln,
  670. Config: &http.Server{Handler: etcdhttp.NewClientHandler(m.s)},
  671. }
  672. hs.Start()
  673. m.hss = append(m.hss, hs)
  674. }
  675. return nil
  676. }
  677. func (m *member) WaitOK(t *testing.T) {
  678. cc := mustNewHTTPClient(t, []string{m.URL()})
  679. kapi := client.NewKeysAPI(cc)
  680. for {
  681. ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
  682. _, err := kapi.Get(ctx, "/", nil)
  683. if err != nil {
  684. time.Sleep(tickDuration)
  685. continue
  686. }
  687. cancel()
  688. break
  689. }
  690. for m.s.Leader() == 0 {
  691. time.Sleep(tickDuration)
  692. }
  693. }
  694. func (m *member) URL() string { return m.ClientURLs[0].String() }
  695. func (m *member) Pause() {
  696. m.raftHandler.Pause()
  697. m.s.PauseSending()
  698. }
  699. func (m *member) Resume() {
  700. m.raftHandler.Resume()
  701. m.s.ResumeSending()
  702. }
  703. // Stop stops the member, but the data dir of the member is preserved.
  704. func (m *member) Stop(t *testing.T) {
  705. m.s.Stop()
  706. for _, hs := range m.hss {
  707. hs.CloseClientConnections()
  708. hs.Close()
  709. }
  710. m.hss = nil
  711. }
  712. // Start starts the member using the preserved data dir.
  713. func (m *member) Restart(t *testing.T) error {
  714. newPeerListeners := make([]net.Listener, 0)
  715. for _, ln := range m.PeerListeners {
  716. newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String()))
  717. }
  718. m.PeerListeners = newPeerListeners
  719. newClientListeners := make([]net.Listener, 0)
  720. for _, ln := range m.ClientListeners {
  721. newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
  722. }
  723. m.ClientListeners = newClientListeners
  724. return m.Launch()
  725. }
  726. // Terminate stops the member and removes the data dir.
  727. func (m *member) Terminate(t *testing.T) {
  728. m.s.Stop()
  729. for _, hs := range m.hss {
  730. hs.CloseClientConnections()
  731. hs.Close()
  732. }
  733. if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
  734. t.Fatal(err)
  735. }
  736. }
  737. func mustNewHTTPClient(t *testing.T, eps []string) client.Client {
  738. cfg := client.Config{Transport: mustNewTransport(t, transport.TLSInfo{}), Endpoints: eps}
  739. c, err := client.New(cfg)
  740. if err != nil {
  741. t.Fatal(err)
  742. }
  743. return c
  744. }
  745. func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport {
  746. tr, err := transport.NewTimeoutTransport(tlsInfo, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  747. if err != nil {
  748. t.Fatal(err)
  749. }
  750. return tr
  751. }
  752. type SortableMemberSliceByPeerURLs []client.Member
  753. func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
  754. func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
  755. return p[i].PeerURLs[0] < p[j].PeerURLs[0]
  756. }
  757. func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }