|
|
@@ -19,7 +19,6 @@ package etcd
|
|
|
import (
|
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
|
- "math/rand"
|
|
|
"net"
|
|
|
"net/http"
|
|
|
"net/http/httptest"
|
|
|
@@ -34,12 +33,37 @@ import (
|
|
|
"github.com/coreos/etcd/store"
|
|
|
)
|
|
|
|
|
|
+func TestMultipleNodes(t *testing.T) {
|
|
|
+ defer afterTest(t)
|
|
|
+ tests := []int{1, 3, 5, 9, 11}
|
|
|
+
|
|
|
+ for _, tt := range tests {
|
|
|
+ c := &testCluster{Size: tt}
|
|
|
+ c.Start()
|
|
|
+ c.Destroy()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func TestMultipleTLSNodes(t *testing.T) {
|
|
|
+ defer afterTest(t)
|
|
|
+ tests := []int{1, 3, 5}
|
|
|
+
|
|
|
+ for _, tt := range tests {
|
|
|
+ c := &testCluster{Size: tt, TLS: true}
|
|
|
+ c.Start()
|
|
|
+ c.Destroy()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestV2Redirect(t *testing.T) {
|
|
|
defer afterTest(t)
|
|
|
- es, hs := buildCluster(3, false)
|
|
|
- waitCluster(t, es)
|
|
|
- u := hs[1].URL
|
|
|
- ru := fmt.Sprintf("%s%s", hs[0].URL, "/v2/keys/foo")
|
|
|
+
|
|
|
+ c := &testCluster{Size: 3}
|
|
|
+ c.Start()
|
|
|
+ defer c.Destroy()
|
|
|
+
|
|
|
+ u := c.URL(1)
|
|
|
+ ru := fmt.Sprintf("%s%s", c.URL(0), "/v2/keys/foo")
|
|
|
tc := NewTestClient()
|
|
|
|
|
|
v := url.Values{}
|
|
|
@@ -56,186 +80,66 @@ func TestV2Redirect(t *testing.T) {
|
|
|
if location.String() != ru {
|
|
|
t.Errorf("location = %v, want %v", location.String(), ru)
|
|
|
}
|
|
|
-
|
|
|
resp.Body.Close()
|
|
|
- destoryCluster(t, es, hs)
|
|
|
-}
|
|
|
-
|
|
|
-func TestAdd(t *testing.T) {
|
|
|
- defer afterTest(t)
|
|
|
- tests := []int{3, 4, 5, 6}
|
|
|
-
|
|
|
- for _, tt := range tests {
|
|
|
- es := make([]*Server, tt)
|
|
|
- hs := make([]*httptest.Server, tt)
|
|
|
- for i := 0; i < tt; i++ {
|
|
|
- c := newTestConfig()
|
|
|
- if i > 0 {
|
|
|
- c.Peers = []string{hs[0].URL}
|
|
|
- }
|
|
|
- es[i], hs[i] = newUnstartedTestServer(c, int64(i), false)
|
|
|
- }
|
|
|
-
|
|
|
- go es[0].Run()
|
|
|
- waitMode(participantMode, es[0])
|
|
|
-
|
|
|
- for i := 1; i < tt; i++ {
|
|
|
- id := int64(i)
|
|
|
- for {
|
|
|
- lead := es[0].p.node.Leader()
|
|
|
- if lead == -1 {
|
|
|
- time.Sleep(defaultElection * es[0].tickDuration)
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- err := es[lead].p.add(id, es[id].raftPubAddr, es[id].pubAddr)
|
|
|
- if err == nil {
|
|
|
- break
|
|
|
- }
|
|
|
- switch err {
|
|
|
- case tmpErr:
|
|
|
- time.Sleep(defaultElection * es[0].tickDuration)
|
|
|
- case raftStopErr, stopErr:
|
|
|
- t.Fatalf("#%d on %d: unexpected stop", i, lead)
|
|
|
- default:
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
- }
|
|
|
- go es[i].Run()
|
|
|
- waitMode(participantMode, es[i])
|
|
|
-
|
|
|
- for j := 0; j <= i; j++ {
|
|
|
- p := fmt.Sprintf("%s/%d", v2machineKVPrefix, id)
|
|
|
- w, err := es[j].p.Watch(p, false, false, 1)
|
|
|
- if err != nil {
|
|
|
- t.Errorf("#%d on %d: %v", i, j, err)
|
|
|
- break
|
|
|
- }
|
|
|
- <-w.EventChan
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- destoryCluster(t, es, hs)
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
func TestRemove(t *testing.T) {
|
|
|
defer afterTest(t)
|
|
|
tests := []int{3, 4, 5, 6}
|
|
|
+ for aa := 0; aa < 1; aa++ {
|
|
|
+ for k, tt := range tests {
|
|
|
+ cl := testCluster{Size: tt}
|
|
|
+ cl.Start()
|
|
|
+
|
|
|
+ lead, _ := cl.Leader()
|
|
|
+ config := conf.NewClusterConfig()
|
|
|
+ config.ActiveSize = 0
|
|
|
+ if err := cl.Participant(int(lead)).setClusterConfig(config); err != nil {
|
|
|
+ t.Fatalf("#%d: setClusterConfig err = %v", k, err)
|
|
|
+ }
|
|
|
|
|
|
- for k, tt := range tests {
|
|
|
- es, hs := buildCluster(tt, false)
|
|
|
- waitCluster(t, es)
|
|
|
-
|
|
|
- lead, _ := waitLeader(es)
|
|
|
- cfg := conf.NewClusterConfig()
|
|
|
- cfg.ActiveSize = 0
|
|
|
- if err := es[lead].p.setClusterConfig(cfg); err != nil {
|
|
|
- t.Fatalf("#%d: setClusterConfig err = %v", k, err)
|
|
|
- }
|
|
|
-
|
|
|
- // we don't remove the machine from 2-node cluster because it is
|
|
|
- // not 100 percent safe in our raft.
|
|
|
- // TODO(yichengq): improve it later.
|
|
|
- for i := 0; i < tt-2; i++ {
|
|
|
- id := int64(i)
|
|
|
- send := id
|
|
|
- for {
|
|
|
- send++
|
|
|
- if send > int64(tt-1) {
|
|
|
- send = id
|
|
|
- }
|
|
|
-
|
|
|
- lead := es[send].p.node.Leader()
|
|
|
- if lead == -1 {
|
|
|
- time.Sleep(defaultElection * 5 * time.Millisecond)
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- err := es[lead].p.remove(id)
|
|
|
- if err == nil {
|
|
|
- break
|
|
|
- }
|
|
|
- switch err {
|
|
|
- case tmpErr:
|
|
|
- time.Sleep(defaultElection * 5 * time.Millisecond)
|
|
|
- case raftStopErr, stopErr:
|
|
|
- if lead == id {
|
|
|
+ // we don't remove the machine from 2-node cluster because it is
|
|
|
+ // not 100 percent safe in our raft.
|
|
|
+ // TODO(yichengq): improve it later.
|
|
|
+ for i := 0; i < tt-2; i++ {
|
|
|
+ id := int64(i)
|
|
|
+ for {
|
|
|
+ n := cl.Node(int(id))
|
|
|
+ if n.e.mode.Get() == standbyMode {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ err := n.Participant().remove(id)
|
|
|
+ if err == nil {
|
|
|
break
|
|
|
}
|
|
|
- default:
|
|
|
- t.Fatal(err)
|
|
|
+ switch err {
|
|
|
+ case tmpErr:
|
|
|
+ time.Sleep(defaultElection * 5 * time.Millisecond)
|
|
|
+ case raftStopErr, stopErr:
|
|
|
+ default:
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+ cl.Node(i).WaitMode(standbyMode, 10)
|
|
|
}
|
|
|
-
|
|
|
- waitMode(standbyMode, es[i])
|
|
|
+ cl.Destroy()
|
|
|
}
|
|
|
-
|
|
|
- destoryCluster(t, es, hs)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func TestBecomeStandby(t *testing.T) {
|
|
|
- defer afterTest(t)
|
|
|
- size := 5
|
|
|
- round := 1
|
|
|
-
|
|
|
- for j := 0; j < round; j++ {
|
|
|
- es, hs := buildCluster(size, false)
|
|
|
- waitCluster(t, es)
|
|
|
-
|
|
|
- lead, _ := waitActiveLeader(es)
|
|
|
- i := rand.Intn(size)
|
|
|
- // cluster only demotes follower
|
|
|
- if int64(i) == lead {
|
|
|
- i = (i + 1) % size
|
|
|
- }
|
|
|
- id := int64(i)
|
|
|
-
|
|
|
- cfg := conf.NewClusterConfig()
|
|
|
- cfg.SyncInterval = 1000
|
|
|
-
|
|
|
- cfg.ActiveSize = size - 1
|
|
|
- if err := es[lead].p.setClusterConfig(cfg); err != nil {
|
|
|
- t.Fatalf("#%d: setClusterConfig err = %v", i, err)
|
|
|
- }
|
|
|
- for {
|
|
|
- err := es[lead].p.remove(id)
|
|
|
- if err == nil {
|
|
|
- break
|
|
|
- }
|
|
|
- switch err {
|
|
|
- case tmpErr:
|
|
|
- time.Sleep(defaultElection * 5 * time.Millisecond)
|
|
|
- default:
|
|
|
- t.Fatalf("#%d: remove err = %v", i, err)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- waitMode(standbyMode, es[i])
|
|
|
-
|
|
|
- var leader int64
|
|
|
- for k := 0; k < 3; k++ {
|
|
|
- leader, _ = es[i].s.leaderInfo()
|
|
|
- if leader != noneId {
|
|
|
- break
|
|
|
- }
|
|
|
- time.Sleep(50 * time.Millisecond)
|
|
|
- }
|
|
|
- if g := leader; g != lead {
|
|
|
- t.Errorf("#%d: lead = %d, want %d", i, g, lead)
|
|
|
- }
|
|
|
-
|
|
|
- destoryCluster(t, es, hs)
|
|
|
- }
|
|
|
-}
|
|
|
+// TODO(yicheng) Add test for becoming standby
|
|
|
+// maxSize -> standby
|
|
|
+// auto-demote -> standby
|
|
|
+// remove -> standby
|
|
|
|
|
|
func TestReleaseVersion(t *testing.T) {
|
|
|
defer afterTest(t)
|
|
|
- es, hs := buildCluster(1, false)
|
|
|
|
|
|
- resp, err := http.Get(hs[0].URL + "/version")
|
|
|
+ cl := testCluster{Size: 1}
|
|
|
+ cl.Start()
|
|
|
+ defer cl.Destroy()
|
|
|
+
|
|
|
+ resp, err := http.Get(cl.URL(0) + "/version")
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
@@ -249,19 +153,16 @@ func TestReleaseVersion(t *testing.T) {
|
|
|
if gs != w {
|
|
|
t.Errorf("version = %v, want %v", gs, w)
|
|
|
}
|
|
|
-
|
|
|
- for i := range hs {
|
|
|
- es[len(hs)-i-1].Stop()
|
|
|
- }
|
|
|
- for i := range hs {
|
|
|
- hs[len(hs)-i-1].Close()
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
func TestVersionCheck(t *testing.T) {
|
|
|
defer afterTest(t)
|
|
|
- es, hs := buildCluster(1, false)
|
|
|
- u := hs[0].URL
|
|
|
+
|
|
|
+ cl := testCluster{Size: 1}
|
|
|
+ cl.Start()
|
|
|
+ defer cl.Destroy()
|
|
|
+
|
|
|
+ u := cl.URL(0)
|
|
|
|
|
|
currentVersion := 2
|
|
|
tests := []struct {
|
|
|
@@ -283,110 +184,84 @@ func TestVersionCheck(t *testing.T) {
|
|
|
t.Fatal("#%d: status = %d, want %d", i, resp.StatusCode, tt.wStatus)
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- for i := range hs {
|
|
|
- es[len(hs)-i-1].Stop()
|
|
|
- }
|
|
|
- for i := range hs {
|
|
|
- hs[len(hs)-i-1].Close()
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
func TestSingleNodeRecovery(t *testing.T) {
|
|
|
defer afterTest(t)
|
|
|
- id := genId()
|
|
|
c := newTestConfig()
|
|
|
- e, h := newUnstartedTestServer(c, id, false)
|
|
|
- startServer(t, e)
|
|
|
- key := "/foo"
|
|
|
+ ts := testServer{Id: genId(), Config: c}
|
|
|
+ ts.Start()
|
|
|
+ defer ts.Destroy()
|
|
|
|
|
|
- ev, err := e.p.Set(key, false, "bar", time.Now().Add(time.Second*100))
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
+ ts.WaitMode(participantMode, 1)
|
|
|
|
|
|
- w, err := e.p.Watch(key, false, false, ev.Index())
|
|
|
+ key := "/foo"
|
|
|
+ ev, err := ts.Participant().Set(key, false, "bar", time.Now().Add(time.Second*100))
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- select {
|
|
|
- case v := <-w.EventChan:
|
|
|
- if v.Node.TTL < 95 {
|
|
|
- t.Errorf("ttl = %d, want >= 95", v.Node.TTL)
|
|
|
- }
|
|
|
- case <-time.After(8 * defaultHeartbeat * e.tickDuration):
|
|
|
- t.Fatal("watch timeout")
|
|
|
- }
|
|
|
-
|
|
|
- e.Stop()
|
|
|
- h.Close()
|
|
|
+ ts.Stop()
|
|
|
|
|
|
- time.Sleep(2 * time.Second)
|
|
|
-
|
|
|
- nc := newTestConfig()
|
|
|
- nc.DataDir = c.DataDir
|
|
|
- e, h = newUnstartedTestServer(nc, id, false)
|
|
|
- startServer(t, e)
|
|
|
-
|
|
|
- waitLeader([]*Server{e})
|
|
|
- w, err = e.p.Watch(key, false, false, ev.Index())
|
|
|
+ ts = testServer{Id: ts.Id, Config: c}
|
|
|
+ ts.Start()
|
|
|
+ ts.WaitMode(participantMode, 1)
|
|
|
+ w, err := ts.Participant().Store.Watch(key, false, false, ev.Index())
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
+ // give testing server time to load the previous WAL file
|
|
|
select {
|
|
|
- case v := <-w.EventChan:
|
|
|
- if v.Node.TTL > 99 {
|
|
|
- t.Errorf("ttl = %d, want <= 99", v.Node.TTL)
|
|
|
- }
|
|
|
- case <-time.After(8 * defaultHeartbeat * e.tickDuration):
|
|
|
- t.Fatal("2nd watch timeout")
|
|
|
+ case <-w.EventChan:
|
|
|
+ case <-time.After(time.Second):
|
|
|
+ t.Fatal("watch timeout")
|
|
|
}
|
|
|
-
|
|
|
- destroyServer(t, e, h)
|
|
|
}
|
|
|
|
|
|
func TestTakingSnapshot(t *testing.T) {
|
|
|
defer afterTest(t)
|
|
|
- es, hs := buildCluster(1, false)
|
|
|
+
|
|
|
+ cl := testCluster{Size: 1}
|
|
|
+ cl.Start()
|
|
|
+ defer cl.Destroy()
|
|
|
+
|
|
|
+ // TODO(xiangli): tunable compact; reduce testing time
|
|
|
for i := 0; i < defaultCompact; i++ {
|
|
|
- es[0].p.Set("/foo", false, "bar", store.Permanent)
|
|
|
+ cl.Participant(0).Set("/foo", false, "bar", store.Permanent)
|
|
|
}
|
|
|
- snap := es[0].p.node.GetSnap()
|
|
|
+ snap := cl.Participant(0).node.GetSnap()
|
|
|
if snap.Index != defaultCompact {
|
|
|
t.Errorf("snap.Index = %d, want %d", snap.Index, defaultCompact)
|
|
|
}
|
|
|
-
|
|
|
- for i := range hs {
|
|
|
- es[len(hs)-i-1].Stop()
|
|
|
- }
|
|
|
- for i := range hs {
|
|
|
- hs[len(hs)-i-1].Close()
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
func TestRestoreSnapshotFromLeader(t *testing.T) {
|
|
|
defer afterTest(t)
|
|
|
- es, hs := buildCluster(1, false)
|
|
|
+
|
|
|
+ cl := testCluster{Size: 1}
|
|
|
+ cl.Start()
|
|
|
+ defer cl.Destroy()
|
|
|
+
|
|
|
// let leader do snapshot
|
|
|
for i := 0; i < defaultCompact; i++ {
|
|
|
- es[0].p.Set(fmt.Sprint("/foo", i), false, fmt.Sprint("bar", i), store.Permanent)
|
|
|
+ cl.Participant(0).Set(fmt.Sprint("/foo", i), false, fmt.Sprint("bar", i), store.Permanent)
|
|
|
}
|
|
|
|
|
|
// create one to join the cluster
|
|
|
c := newTestConfig()
|
|
|
- c.Peers = []string{hs[0].URL}
|
|
|
- e, h := newUnstartedTestServer(c, 1, false)
|
|
|
- go e.Run()
|
|
|
- waitMode(participantMode, e)
|
|
|
+ c.Peers = []string{cl.URL(0)}
|
|
|
+ ts := testServer{Config: c, Id: 1}
|
|
|
+ ts.Start()
|
|
|
+ defer ts.Destroy()
|
|
|
+ ts.WaitMode(participantMode, 1)
|
|
|
|
|
|
// check new proposal could be submitted
|
|
|
- if _, err := es[0].p.Set("/foo", false, "bar", store.Permanent); err != nil {
|
|
|
+ if _, err := cl.Participant(0).Set("/foo", false, "bar", store.Permanent); err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
|
|
|
// check store is recovered
|
|
|
for i := 0; i < defaultCompact; i++ {
|
|
|
- ev, err := e.p.Store.Get(fmt.Sprint("/foo", i), false, false)
|
|
|
+ ev, err := ts.Participant().Store.Get(fmt.Sprint("/foo", i), false, false)
|
|
|
if err != nil {
|
|
|
t.Errorf("get err = %v", err)
|
|
|
continue
|
|
|
@@ -398,61 +273,192 @@ func TestRestoreSnapshotFromLeader(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
// check new proposal could be committed in the new machine
|
|
|
- wch, err := e.p.Watch("/foo", false, false, defaultCompact)
|
|
|
+ wch, err := ts.Participant().Watch("/foo", false, false, defaultCompact)
|
|
|
if err != nil {
|
|
|
t.Errorf("watch err = %v", err)
|
|
|
}
|
|
|
<-wch.EventChan
|
|
|
|
|
|
- g := e.p.node.Nodes()
|
|
|
- w := es[0].p.node.Nodes()
|
|
|
+ g := ts.Participant().node.Nodes()
|
|
|
+ w := cl.Participant(0).node.Nodes()
|
|
|
if !reflect.DeepEqual(g, w) {
|
|
|
t.Errorf("nodes = %v, want %v", g, w)
|
|
|
}
|
|
|
+}
|
|
|
+
|
|
|
+type testCluster struct {
|
|
|
+ Size int
|
|
|
+ TLS bool
|
|
|
|
|
|
- e.Stop()
|
|
|
- es[0].Stop()
|
|
|
- h.Close()
|
|
|
- hs[0].Close()
|
|
|
+ nodes []*testServer
|
|
|
}
|
|
|
|
|
|
-func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {
|
|
|
- bootstrapper := 0
|
|
|
- es := make([]*Server, number)
|
|
|
- hs := make([]*httptest.Server, number)
|
|
|
- var seed string
|
|
|
+func (c *testCluster) Start() {
|
|
|
+ if c.Size <= 0 {
|
|
|
+ panic("cluster size <= 0")
|
|
|
+ }
|
|
|
+
|
|
|
+ nodes := make([]*testServer, c.Size)
|
|
|
+ c.nodes = nodes
|
|
|
+ nodes[0] = &testServer{Id: 0, TLS: c.TLS}
|
|
|
+ nodes[0].Start()
|
|
|
+ if !nodes[0].WaitMode(participantMode, 5) {
|
|
|
+ panic("cannot wait until participantMode")
|
|
|
+ }
|
|
|
|
|
|
- for i := range es {
|
|
|
- c := newTestConfig()
|
|
|
- if seed != "" {
|
|
|
- c.Peers = []string{seed}
|
|
|
+ seed := nodes[0].URL
|
|
|
+ for i := 1; i < c.Size; i++ {
|
|
|
+ cfg := newTestConfig()
|
|
|
+ cfg.Peers = []string{seed}
|
|
|
+ id := int64(i)
|
|
|
+ s := &testServer{Config: cfg, Id: id, TLS: c.TLS}
|
|
|
+ s.Start()
|
|
|
+ nodes[i] = s
|
|
|
+
|
|
|
+ // Wait for the previous configuration change to be committed
|
|
|
+ // or this configuration request might be dropped.
|
|
|
+ // Or it could be a slow join because it needs to retry.
|
|
|
+ // TODO: this might not be true if we add param for retry interval.
|
|
|
+ if !s.WaitMode(participantMode, 3) {
|
|
|
+ panic("cannot wait until participantMode")
|
|
|
+ }
|
|
|
+ w, err := s.Participant().Watch(v2machineKVPrefix, true, false, uint64(i))
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
}
|
|
|
- es[i], hs[i] = newUnstartedTestServer(c, int64(i), tls)
|
|
|
-
|
|
|
- if i == bootstrapper {
|
|
|
- seed = hs[i].URL
|
|
|
- } else {
|
|
|
- // wait for the previous configuration change to be committed
|
|
|
- // or this configuration request might be dropped
|
|
|
- w, err := es[0].p.Watch(v2machineKVPrefix, true, false, uint64(i))
|
|
|
+ <-w.EventChan
|
|
|
+ }
|
|
|
+ c.wait()
|
|
|
+}
|
|
|
+
|
|
|
+func (c *testCluster) wait() {
|
|
|
+ size := c.Size
|
|
|
+ for i := 0; i < size; i++ {
|
|
|
+ for k := 0; k < size; k++ {
|
|
|
+ s := c.Node(i)
|
|
|
+ wp := v2machineKVPrefix + fmt.Sprintf("/%d", c.Node(k).Id)
|
|
|
+ w, err := s.Participant().Watch(wp, false, false, 1)
|
|
|
if err != nil {
|
|
|
panic(err)
|
|
|
}
|
|
|
<-w.EventChan
|
|
|
}
|
|
|
- go es[i].Run()
|
|
|
- waitMode(participantMode, es[i])
|
|
|
}
|
|
|
- return es, hs
|
|
|
+
|
|
|
+ clusterId := c.Participant(0).node.ClusterId()
|
|
|
+ for i := 0; i < size; i++ {
|
|
|
+ if g := c.Participant(i).node.ClusterId(); g != clusterId {
|
|
|
+ panic(fmt.Sprintf("#%d: clusterId = %x, want %x", i, g, clusterId))
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *testCluster) Node(i int) *testServer {
|
|
|
+ return c.nodes[i]
|
|
|
+}
|
|
|
+
|
|
|
+func (c *testCluster) Participant(i int) *participant {
|
|
|
+ return c.Node(i).Participant()
|
|
|
+}
|
|
|
+
|
|
|
+func (c *testCluster) Standby(i int) *standby {
|
|
|
+ return c.Node(i).Standby()
|
|
|
+}
|
|
|
+
|
|
|
+func (c *testCluster) URL(i int) string {
|
|
|
+ return c.nodes[i].h.URL
|
|
|
+}
|
|
|
+
|
|
|
+func (c *testCluster) Restart() {
|
|
|
+ for _, s := range c.nodes {
|
|
|
+ s.Start()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *testCluster) Stop() {
|
|
|
+ for _, s := range c.nodes {
|
|
|
+ s.Stop()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *testCluster) Destroy() {
|
|
|
+ for _, s := range c.nodes {
|
|
|
+ s.Destroy()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *testCluster) Leader() (lead, term int64) {
|
|
|
+ for {
|
|
|
+ ls := make([]leadterm, 0, c.Size)
|
|
|
+ for i := range c.nodes {
|
|
|
+ switch c.Node(i).e.mode.Get() {
|
|
|
+ case participantMode:
|
|
|
+ ls = append(ls, c.Node(i).Lead())
|
|
|
+ case standbyMode:
|
|
|
+ //TODO(xiangli) add standby support
|
|
|
+ case stopMode:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if isSameLead(ls) {
|
|
|
+ return ls[0].lead, ls[0].term
|
|
|
+ }
|
|
|
+ time.Sleep(c.Node(0).e.tickDuration * defaultElection)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func newUnstartedTestServer(c *conf.Config, id int64, tls bool) (*Server, *httptest.Server) {
|
|
|
- e, err := New(c)
|
|
|
+type leadterm struct {
|
|
|
+ lead int64
|
|
|
+ term int64
|
|
|
+}
|
|
|
+
|
|
|
+func isSameLead(ls []leadterm) bool {
|
|
|
+ m := make(map[leadterm]int)
|
|
|
+ for i := range ls {
|
|
|
+ m[ls[i]] = m[ls[i]] + 1
|
|
|
+ }
|
|
|
+ if len(m) == 1 {
|
|
|
+ if ls[0].lead == -1 {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ // todo(xiangli): printout the current cluster status for debugging....
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
+type testServer struct {
|
|
|
+ Config *conf.Config
|
|
|
+ Id int64
|
|
|
+ TLS bool
|
|
|
+
|
|
|
+ // base URL of form http://ipaddr:port with no trailing slash
|
|
|
+ URL string
|
|
|
+
|
|
|
+ e *Server
|
|
|
+ h *httptest.Server
|
|
|
+}
|
|
|
+
|
|
|
+func (s *testServer) Start() {
|
|
|
+ if s.Config == nil {
|
|
|
+ s.Config = newTestConfig()
|
|
|
+ }
|
|
|
+ c := s.Config
|
|
|
+ if !strings.HasPrefix(c.DataDir, os.TempDir()) {
|
|
|
+ panic("dataDir may pollute file system")
|
|
|
+ }
|
|
|
+ if c.Peer.CAFile != "" || c.Peer.CertFile != "" || c.Peer.KeyFile != "" {
|
|
|
+ panic("use TLS field instead")
|
|
|
+ }
|
|
|
+
|
|
|
+ nc := *c
|
|
|
+ e, err := New(&nc)
|
|
|
if err != nil {
|
|
|
panic(err)
|
|
|
}
|
|
|
- e.setId(id)
|
|
|
- e.SetTick(time.Millisecond * 5)
|
|
|
+ s.e = e
|
|
|
+ e.setId(s.Id)
|
|
|
+ tick := time.Duration(c.Peer.HeartbeatInterval) * time.Millisecond
|
|
|
+ e.SetTick(tick)
|
|
|
|
|
|
m := http.NewServeMux()
|
|
|
m.Handle("/", e)
|
|
|
@@ -460,14 +466,62 @@ func newUnstartedTestServer(c *conf.Config, id int64, tls bool) (*Server, *httpt
|
|
|
m.Handle("/raft/", e.RaftHandler())
|
|
|
m.Handle("/v2/admin/", e.RaftHandler())
|
|
|
|
|
|
- u, err := url.Parse(c.Addr)
|
|
|
- if err != nil {
|
|
|
+ addr := c.Addr
|
|
|
+ if s.URL != "" {
|
|
|
+ addr = urlHost(s.URL)
|
|
|
+ }
|
|
|
+ s.h = startServingAddr(addr, m, s.TLS)
|
|
|
+ s.URL = s.h.URL
|
|
|
+
|
|
|
+ e.pubAddr = s.URL
|
|
|
+ e.raftPubAddr = s.URL
|
|
|
+ go e.Run()
|
|
|
+}
|
|
|
+
|
|
|
+func (s *testServer) WaitMode(mode int64, timeout int) bool {
|
|
|
+ for i := 0; i < timeout+1; i++ {
|
|
|
+ if s.e.mode.Get() == mode {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ time.Sleep(time.Millisecond)
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
+func (s *testServer) Participant() *participant {
|
|
|
+ if s.e.mode.Get() != participantMode {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return s.e.p
|
|
|
+}
|
|
|
+
|
|
|
+func (s *testServer) Standby() *standby {
|
|
|
+ return s.e.s
|
|
|
+}
|
|
|
+
|
|
|
+func (s *testServer) Lead() leadterm {
|
|
|
+ return leadterm{s.Participant().node.Leader(), s.Participant().node.Term()}
|
|
|
+}
|
|
|
+
|
|
|
+func (s *testServer) Stop() error {
|
|
|
+ err := s.e.Stop()
|
|
|
+ s.h.Close()
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func (s *testServer) Destroy() error {
|
|
|
+ err := s.Stop()
|
|
|
+ if err := os.RemoveAll(s.Config.DataDir); err != nil {
|
|
|
panic(err)
|
|
|
}
|
|
|
+ return err
|
|
|
+}
|
|
|
|
|
|
+func startServingAddr(addr string, h http.Handler, tls bool) *httptest.Server {
|
|
|
var l net.Listener
|
|
|
- for {
|
|
|
- l, err = net.Listen("tcp", u.Host)
|
|
|
+ var err error
|
|
|
+ for i := 0; i < 4; i++ {
|
|
|
+ l, err = net.Listen("tcp", addr)
|
|
|
if err == nil {
|
|
|
break
|
|
|
}
|
|
|
@@ -476,72 +530,39 @@ func newUnstartedTestServer(c *conf.Config, id int64, tls bool) (*Server, *httpt
|
|
|
}
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
}
|
|
|
- h := &httptest.Server{
|
|
|
+ if l == nil {
|
|
|
+ panic("cannot listen on " + addr)
|
|
|
+ }
|
|
|
+ hs := &httptest.Server{
|
|
|
Listener: l,
|
|
|
- Config: &http.Server{Handler: m},
|
|
|
+ Config: &http.Server{Handler: h},
|
|
|
}
|
|
|
if tls {
|
|
|
- h.StartTLS()
|
|
|
+ hs.StartTLS()
|
|
|
} else {
|
|
|
- h.Start()
|
|
|
- }
|
|
|
-
|
|
|
- e.raftPubAddr = h.URL
|
|
|
- e.pubAddr = h.URL
|
|
|
-
|
|
|
- return e, h
|
|
|
-}
|
|
|
-
|
|
|
-func destoryCluster(t *testing.T, es []*Server, hs []*httptest.Server) {
|
|
|
- for i := range es {
|
|
|
- e := es[len(es)-i-1]
|
|
|
- e.Stop()
|
|
|
- err := os.RemoveAll(e.cfg.DataDir)
|
|
|
- if err != nil {
|
|
|
- panic(err)
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
- }
|
|
|
- for i := range hs {
|
|
|
- hs[len(hs)-i-1].Close()
|
|
|
+ hs.Start()
|
|
|
}
|
|
|
+ return hs
|
|
|
}
|
|
|
|
|
|
-func destroyServer(t *testing.T, e *Server, h *httptest.Server) {
|
|
|
- e.Stop()
|
|
|
- h.Close()
|
|
|
- err := os.RemoveAll(e.cfg.DataDir)
|
|
|
+func newTestConfig() *conf.Config {
|
|
|
+ c := conf.New()
|
|
|
+ c.Addr = "127.0.0.1:0"
|
|
|
+ c.Peer.Addr = "127.0.0.1:0"
|
|
|
+ c.Peer.HeartbeatInterval = 5
|
|
|
+ c.Peer.ElectionTimeout = 25
|
|
|
+ dataDir, err := ioutil.TempDir(os.TempDir(), "etcd")
|
|
|
if err != nil {
|
|
|
panic(err)
|
|
|
- t.Fatal(err)
|
|
|
}
|
|
|
+ c.DataDir = dataDir
|
|
|
+ return c
|
|
|
}
|
|
|
|
|
|
-func waitCluster(t *testing.T, es []*Server) {
|
|
|
- n := len(es)
|
|
|
- for _, e := range es {
|
|
|
- for k := 0; k < n; k++ {
|
|
|
- w, err := e.p.Watch(v2machineKVPrefix+fmt.Sprintf("/%d", es[k].id), true, false, 1)
|
|
|
- if err != nil {
|
|
|
- panic(err)
|
|
|
- }
|
|
|
- <-w.EventChan
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- clusterId := es[0].p.node.ClusterId()
|
|
|
- for i, e := range es {
|
|
|
- if e.p.node.ClusterId() != clusterId {
|
|
|
- t.Errorf("#%d: clusterId = %x, want %x", i, e.p.node.ClusterId(), clusterId)
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func waitMode(mode int64, e *Server) {
|
|
|
- for {
|
|
|
- if e.mode.Get() == mode {
|
|
|
- return
|
|
|
- }
|
|
|
- time.Sleep(10 * time.Millisecond)
|
|
|
+func urlHost(urlStr string) string {
|
|
|
+ u, err := url.Parse(urlStr)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
}
|
|
|
+ return u.Host
|
|
|
}
|