Browse Source

Merge pull request #243 from benbjohnson/reintegrate-functional-tests

Reintegrate functional tests into etcd.
Ben Johnson 12 years ago
parent
commit
66becddac3

+ 0 - 3
.travis.yml

@@ -1,8 +1,5 @@
 language: go
 go: 1.1
 
-install:
- - go get -u github.com/coreos/etcd-test-runner
-
 script:
  - ./test.sh

+ 4 - 6
test.sh

@@ -3,10 +3,8 @@
 # Get GOPATH, etc from build
 . ./build
 
-# Run the tests!
-go test -i
-go test -v
+# Unit tests
+go test -v ./store
 
-# Run the functional tests!
-go test -i github.com/coreos/etcd-test-runner
-ETCD_BIN_PATH=$(pwd)/etcd go test -v github.com/coreos/etcd-test-runner
+# Functional tests
+ETCD_BIN_PATH=$(pwd)/etcd go test -v ./tests/functional

+ 34 - 0
tests/functional/etcd_direct_call.go

@@ -0,0 +1,34 @@
+package test
+
+import (
+	"net/http"
+	"os"
+	"testing"
+	"time"
+)
+
+func BenchmarkEtcdDirectCall(b *testing.B) {
+	templateBenchmarkEtcdDirectCall(b, false)
+}
+
+func BenchmarkEtcdDirectCallTls(b *testing.B) {
+	templateBenchmarkEtcdDirectCall(b, true)
+}
+
+func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 3
+	_, etcds, _ := CreateCluster(clusterSize, procAttr, tls)
+
+	defer DestroyCluster(etcds)
+
+	time.Sleep(time.Second)
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		resp, _ := http.Get("http://127.0.0.1:4001/test/speed")
+		resp.Body.Close()
+	}
+}

+ 17 - 0
tests/functional/init.go

@@ -0,0 +1,17 @@
+package test
+
+import (
+	"go/build"
+	"os"
+	"path/filepath"
+)
+
+var EtcdBinPath string
+
+func init() {
+	// Initialize the 'etcd' binary path or default it to the etcd diretory.
+	EtcdBinPath = os.Getenv("ETCD_BIN_PATH")
+	if EtcdBinPath == "" {
+		EtcdBinPath = filepath.Join(build.Default.GOPATH, "src", "github.com", "coreos", "etcd", "etcd")
+	}
+}

+ 57 - 0
tests/functional/internal_version_test.go

@@ -0,0 +1,57 @@
+package test
+
+import (
+	"fmt"
+	"net/http"
+	"net/http/httptest"
+	"net/url"
+	"os"
+	"testing"
+	"time"
+)
+
+// Ensure that etcd does not come up if the internal raft versions do not match.
+func TestInternalVersion(t *testing.T) {
+	checkedVersion := false
+	testMux := http.NewServeMux()
+
+	testMux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
+		fmt.Fprintln(w, "This is not a version number")
+		checkedVersion = true
+	})
+
+	testMux.HandleFunc("/join", func(w http.ResponseWriter, r *http.Request) {
+		t.Fatal("should not attempt to join!")
+	})
+
+	ts := httptest.NewServer(testMux)
+	defer ts.Close()
+
+	fakeURL, _ := url.Parse(ts.URL)
+
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+	args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1", "-C=" + fakeURL.Host}
+
+	process, err := os.StartProcess(EtcdBinPath, args, procAttr)
+	if err != nil {
+		t.Fatal("start process failed:" + err.Error())
+		return
+	}
+	defer process.Kill()
+
+	time.Sleep(time.Second)
+
+	_, err = http.Get("http://127.0.0.1:4001")
+
+	if err == nil {
+		t.Fatal("etcd node should not be up")
+		return
+	}
+
+	if checkedVersion == false {
+		t.Fatal("etcd did not check the version")
+		return
+	}
+}
+

+ 63 - 0
tests/functional/kill_leader_test.go

@@ -0,0 +1,63 @@
+package test
+
+import (
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"testing"
+	"time"
+)
+
+// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
+// It will print out the election time and the average election time.
+func TestKillLeader(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 5
+	argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+	defer DestroyCluster(etcds)
+
+	stop := make(chan bool)
+	leaderChan := make(chan string, 1)
+	all := make(chan bool, 1)
+
+	time.Sleep(time.Second)
+
+	go Monitor(clusterSize, 1, leaderChan, all, stop)
+
+	var totalTime time.Duration
+
+	leader := "http://127.0.0.1:7001"
+
+	for i := 0; i < clusterSize; i++ {
+		fmt.Println("leader is ", leader)
+		port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
+		num := port - 7001
+		fmt.Println("kill server ", num)
+		etcds[num].Kill()
+		etcds[num].Release()
+
+		start := time.Now()
+		for {
+			newLeader := <-leaderChan
+			if newLeader != leader {
+				leader = newLeader
+				break
+			}
+		}
+		take := time.Now().Sub(start)
+
+		totalTime += take
+		avgTime := totalTime / (time.Duration)(i+1)
+		fmt.Println("Total time:", totalTime, "; Avg time:", avgTime)
+		
+		etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr)
+	}
+	stop <- true
+}
+

+ 76 - 0
tests/functional/kill_random_test.go

@@ -0,0 +1,76 @@
+package test
+
+import (
+	"fmt"
+	"math/rand"
+	"os"
+	"testing"
+	"time"
+)
+
+// TestKillRandom kills random machines in the cluster and
+// restart them after all other machines agree on the same leader
+func TestKillRandom(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 9
+	argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer DestroyCluster(etcds)
+
+	stop := make(chan bool)
+	leaderChan := make(chan string, 1)
+	all := make(chan bool, 1)
+
+	time.Sleep(3 * time.Second)
+
+	go Monitor(clusterSize, 4, leaderChan, all, stop)
+
+	toKill := make(map[int]bool)
+
+	for i := 0; i < 20; i++ {
+		fmt.Printf("TestKillRandom Round[%d/20]\n", i)
+
+		j := 0
+		for {
+
+			r := rand.Int31n(9)
+			if _, ok := toKill[int(r)]; !ok {
+				j++
+				toKill[int(r)] = true
+			}
+
+			if j > 3 {
+				break
+			}
+
+		}
+
+		for num, _ := range toKill {
+			err := etcds[num].Kill()
+			if err != nil {
+				panic(err)
+			}
+			etcds[num].Wait()
+		}
+
+		time.Sleep(1 * time.Second)
+
+		<-leaderChan
+
+		for num, _ := range toKill {
+			etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr)
+		}
+
+		toKill = make(map[int]bool)
+		<-all
+	}
+
+	stop <- true
+}
+

+ 72 - 0
tests/functional/multi_node_kill_all_and_recovery_test.go

@@ -0,0 +1,72 @@
+package test
+
+import (
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/go-etcd/etcd"
+)
+
+// Create a five nodes
+// Kill all the nodes and restart
+func TestMultiNodeKillAllAndRecovery(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 5
+	argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
+	defer DestroyCluster(etcds)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	c := etcd.NewClient(nil)
+
+	c.SyncCluster()
+
+	time.Sleep(time.Second)
+
+	// send 10 commands
+	for i := 0; i < 10; i++ {
+		// Test Set
+		_, err := c.Set("foo", "bar", 0)
+		if err != nil {
+			panic(err)
+		}
+	}
+
+	time.Sleep(time.Second)
+
+	// kill all
+	DestroyCluster(etcds)
+
+	time.Sleep(time.Second)
+
+	stop := make(chan bool)
+	leaderChan := make(chan string, 1)
+	all := make(chan bool, 1)
+
+	time.Sleep(time.Second)
+
+	for i := 0; i < clusterSize; i++ {
+		etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr)
+	}
+
+	go Monitor(clusterSize, 1, leaderChan, all, stop)
+
+	<-all
+	<-leaderChan
+
+	result, err := c.Set("foo", "bar", 0)
+
+	if err != nil {
+		t.Fatalf("Recovery error: %s", err)
+	}
+
+	if result.Index != 18 {
+		t.Fatalf("recovery failed! [%d/18]", result.Index)
+	}
+}
+

+ 58 - 0
tests/functional/multi_node_kill_one_test.go

@@ -0,0 +1,58 @@
+package test
+
+import (
+	"fmt"
+	"math/rand"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/go-etcd/etcd"
+)
+
+// Create a five nodes
+// Randomly kill one of the node and keep on sending set command to the cluster
+func TestMultiNodeKillOne(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 5
+	argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer DestroyCluster(etcds)
+
+	time.Sleep(2 * time.Second)
+
+	c := etcd.NewClient(nil)
+
+	c.SyncCluster()
+
+	stop := make(chan bool)
+	// Test Set
+	go Set(stop)
+
+	for i := 0; i < 10; i++ {
+		num := rand.Int() % clusterSize
+		fmt.Println("kill node", num+1)
+
+		// kill
+		etcds[num].Kill()
+		etcds[num].Release()
+		time.Sleep(time.Second)
+
+		// restart
+		etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr)
+		if err != nil {
+			panic(err)
+		}
+		time.Sleep(time.Second)
+	}
+	fmt.Println("stop")
+	stop <- true
+	<-stop
+}
+

+ 113 - 0
tests/functional/remove_node_test.go

@@ -0,0 +1,113 @@
+package test
+
+import (
+	"net/http"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/go-etcd/etcd"
+)
+
+// remove the node and node rejoin with previous log
+func TestRemoveNode(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 3
+	argGroup, etcds, _ := CreateCluster(clusterSize, procAttr, false)
+	defer DestroyCluster(etcds)
+
+	time.Sleep(time.Second)
+
+	c := etcd.NewClient(nil)
+
+	c.SyncCluster()
+
+	rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil)
+
+	client := &http.Client{}
+	for i := 0; i < 2; i++ {
+		for i := 0; i < 2; i++ {
+			client.Do(rmReq)
+
+			etcds[2].Wait()
+
+			resp, err := c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 2 {
+				t.Fatal("cannot remove machine")
+			}
+
+			if i == 1 {
+				// rejoin with log
+				etcds[2], err = os.StartProcess(EtcdBinPath, argGroup[2], procAttr)
+			} else {
+				// rejoin without log
+				etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr)
+			}
+
+			if err != nil {
+				panic(err)
+			}
+
+			time.Sleep(time.Second)
+
+			resp, err = c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 3 {
+				t.Fatalf("add machine fails #1 (%d != 3)", len(resp))
+			}
+		}
+
+		// first kill the node, then remove it, then add it back
+		for i := 0; i < 2; i++ {
+			etcds[2].Kill()
+			etcds[2].Wait()
+
+			client.Do(rmReq)
+
+			resp, err := c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 2 {
+				t.Fatal("cannot remove machine")
+			}
+
+			if i == 1 {
+				// rejoin with log
+				etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2]), procAttr)
+			} else {
+				// rejoin without log
+				etcds[2], err = os.StartProcess(EtcdBinPath, append(argGroup[2], "-f"), procAttr)
+			}
+
+			if err != nil {
+				panic(err)
+			}
+
+			time.Sleep(time.Second)
+
+			resp, err = c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 3 {
+				t.Fatalf("add machine fails #2 (%d != 3)", len(resp))
+			}
+		}
+	}
+}

+ 62 - 0
tests/functional/simple_multi_node_test.go

@@ -0,0 +1,62 @@
+package test
+
+import (
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/go-etcd/etcd"
+)
+
+func TestSimpleMultiNode(t *testing.T) {
+	templateTestSimpleMultiNode(t, false)
+}
+
+func TestSimpleMultiNodeTls(t *testing.T) {
+	templateTestSimpleMultiNode(t, true)
+}
+
+// Create a three nodes and try to set value
+func templateTestSimpleMultiNode(t *testing.T, tls bool) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 3
+
+	_, etcds, err := CreateCluster(clusterSize, procAttr, tls)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer DestroyCluster(etcds)
+
+	time.Sleep(time.Second)
+
+	c := etcd.NewClient(nil)
+
+	c.SyncCluster()
+
+	// Test Set
+	result, err := c.Set("foo", "bar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+	time.Sleep(time.Second)
+
+	result, err = c.Set("foo", "bar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 100 {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+}

+ 68 - 0
tests/functional/single_node_recovery_test.go

@@ -0,0 +1,68 @@
+package test
+
+import (
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/go-etcd/etcd"
+)
+
+// This test creates a single node and then set a value to it.
+// Then this test kills the node and restart it and tries to get the value again.
+func TestSingleNodeRecovery(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+	args := []string{"etcd", "-n=node1", "-d=/tmp/node1"}
+
+	process, err := os.StartProcess(EtcdBinPath, append(args, "-f"), procAttr)
+	if err != nil {
+		t.Fatal("start process failed:" + err.Error())
+		return
+	}
+
+	time.Sleep(time.Second)
+
+	c := etcd.NewClient(nil)
+
+	c.SyncCluster()
+	// Test Set
+	result, err := c.Set("foo", "bar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+	time.Sleep(time.Second)
+
+	process.Kill()
+
+	process, err = os.StartProcess(EtcdBinPath, args, procAttr)
+	defer process.Kill()
+	if err != nil {
+		t.Fatal("start process failed:" + err.Error())
+		return
+	}
+
+	time.Sleep(time.Second)
+
+	results, err := c.Get("foo")
+	if err != nil {
+		t.Fatal("get fail: " + err.Error())
+		return
+	}
+
+	result = results[0]
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL > 99 {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Recovery Get failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+}
+

+ 77 - 0
tests/functional/single_node_test.go

@@ -0,0 +1,77 @@
+package test
+
+import (
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/go-etcd/etcd"
+)
+
+// Create a single node and try to set value
+func TestSingleNode(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+	args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1"}
+
+	process, err := os.StartProcess(EtcdBinPath, args, procAttr)
+	if err != nil {
+		t.Fatal("start process failed:" + err.Error())
+		return
+	}
+	defer process.Kill()
+
+	time.Sleep(time.Second)
+
+	c := etcd.NewClient(nil)
+
+	c.SyncCluster()
+	// Test Set
+	result, err := c.Set("foo", "bar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
+		if err != nil {
+			t.Fatal("Set 1: ", err)
+		}
+
+		t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+	time.Sleep(time.Second)
+
+	result, err = c.Set("foo", "bar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 100 {
+		if err != nil {
+			t.Fatal("Set 2: ", err)
+		}
+		t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+	// Add a test-and-set test
+
+	// First, we'll test we can change the value if we get it write
+	result, match, err := c.TestAndSet("foo", "bar", "foobar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 100 || !match {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Set 3 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+	// Next, we'll make sure we can't set it without the correct prior value
+	_, _, err = c.TestAndSet("foo", "bar", "foofoo", 100)
+
+	if err == nil {
+		t.Fatalf("Set 4 expecting error when setting key with incorrect previous value")
+	}
+
+	// Finally, we'll make sure a blank previous value still counts as a test-and-set and still has to match
+	_, _, err = c.TestAndSet("foo", "", "barbar", 100)
+
+	if err == nil {
+		t.Fatalf("Set 5 expecting error when setting key with blank (incorrect) previous value")
+	}
+}
+

+ 205 - 0
tests/functional/util.go

@@ -0,0 +1,205 @@
+package test
+
+import (
+	"fmt"
+	"github.com/coreos/go-etcd/etcd"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"os"
+	"strconv"
+	"time"
+)
+
+var client = http.Client{
+	Transport: &http.Transport{
+		Dial: dialTimeoutFast,
+	},
+}
+
+// Sending set commands
+func Set(stop chan bool) {
+
+	stopSet := false
+	i := 0
+	c := etcd.NewClient(nil)
+	for {
+		key := fmt.Sprintf("%s_%v", "foo", i)
+
+		result, err := c.Set(key, "bar", 0)
+
+		if err != nil || result.Key != "/"+key || result.Value != "bar" {
+			select {
+			case <-stop:
+				stopSet = true
+
+			default:
+			}
+		}
+
+		select {
+		case <-stop:
+			stopSet = true
+
+		default:
+		}
+
+		if stopSet {
+			break
+		}
+
+		i++
+	}
+	stop <- true
+}
+
+// Create a cluster of etcd nodes
+func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) {
+	argGroup := make([][]string, size)
+
+	sslServer1 := []string{"-serverCAFile=./fixtures/ca/ca.crt",
+		"-serverCert=./fixtures/ca/server.crt",
+		"-serverKey=./fixtures/ca/server.key.insecure",
+	}
+
+	sslServer2 := []string{"-serverCAFile=./fixtures/ca/ca.crt",
+		"-serverCert=./fixtures/ca/server2.crt",
+		"-serverKey=./fixtures/ca/server2.key.insecure",
+	}
+
+	for i := 0; i < size; i++ {
+		if i == 0 {
+			argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
+			if ssl {
+				argGroup[i] = append(argGroup[i], sslServer1...)
+			}
+		} else {
+			strI := strconv.Itoa(i + 1)
+			argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
+			if ssl {
+				argGroup[i] = append(argGroup[i], sslServer2...)
+			}
+		}
+	}
+
+	etcds := make([]*os.Process, size)
+
+	for i, _ := range etcds {
+		var err error
+		etcds[i], err = os.StartProcess(EtcdBinPath, append(argGroup[i], "-f"), procAttr)
+		if err != nil {
+			return nil, nil, err
+		}
+
+		// TODOBP: Change this sleep to wait until the master is up.
+		// The problem is that if the master isn't up then the children
+		// have to retry. This retry can take upwards of 15 seconds
+		// which slows tests way down and some of them fail.
+		if i == 0 {
+			time.Sleep(time.Second * 2)
+		}
+	}
+
+	return argGroup, etcds, nil
+}
+
+// Destroy all the nodes in the cluster
+func DestroyCluster(etcds []*os.Process) error {
+	for _, etcd := range etcds {
+		err := etcd.Kill()
+		if err != nil {
+			panic(err.Error())
+		}
+		etcd.Release()
+	}
+	return nil
+}
+
+//
+func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) {
+	leaderMap := make(map[int]string)
+	baseAddrFormat := "http://0.0.0.0:400%d"
+
+	for {
+		knownLeader := "unknown"
+		dead := 0
+		var i int
+
+		for i = 0; i < size; i++ {
+			leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
+
+			if err == nil {
+				leaderMap[i] = leader
+
+				if knownLeader == "unknown" {
+					knownLeader = leader
+				} else {
+					if leader != knownLeader {
+						break
+					}
+
+				}
+
+			} else {
+				dead++
+				if dead > allowDeadNum {
+					break
+				}
+			}
+
+		}
+
+		if i == size {
+			select {
+			case <-stop:
+				return
+			case <-leaderChan:
+				leaderChan <- knownLeader
+			default:
+				leaderChan <- knownLeader
+			}
+
+		}
+		if dead == 0 {
+			select {
+			case <-all:
+				all <- true
+			default:
+				all <- true
+			}
+		}
+
+		time.Sleep(time.Millisecond * 10)
+	}
+
+}
+
+func getLeader(addr string) (string, error) {
+
+	resp, err := client.Get(addr + "/v1/leader")
+
+	if err != nil {
+		return "", err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		resp.Body.Close()
+		return "", fmt.Errorf("no leader")
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+		return "", err
+	}
+
+	return string(b), nil
+
+}
+
+// Dial with timeout
+func dialTimeoutFast(network, addr string) (net.Conn, error) {
+	return net.DialTimeout(network, addr, time.Millisecond*10)
+}