Pārlūkot izejas kodu

Merge pull request #1609 from yichengq/202

etcdserver: refactor sender
Yicheng Qin 11 gadi atpakaļ
vecāks
revīzija
9d19429993

+ 29 - 6
etcdserver/cluster.go

@@ -27,6 +27,7 @@ import (
 	"reflect"
 	"sort"
 	"strings"
+	"sync"
 
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/pkg/flags"
@@ -46,6 +47,7 @@ type ClusterInfo interface {
 	// Members returns a slice of members sorted by their ID
 	Members() []*Member
 	Member(id types.ID) *Member
+	IsIDRemoved(id types.ID) bool
 }
 
 // Cluster is a list of Members that belong to the same raft cluster
@@ -57,6 +59,7 @@ type Cluster struct {
 	// removed id cannot be reused.
 	removed map[types.ID]bool
 	store   store.Store
+	sync.Mutex
 }
 
 // NewClusterFromString returns Cluster through given cluster token and parsing
@@ -111,9 +114,11 @@ func newCluster(token string) *Cluster {
 	}
 }
 
-func (c Cluster) ID() types.ID { return c.id }
+func (c *Cluster) ID() types.ID { return c.id }
 
-func (c Cluster) Members() []*Member {
+func (c *Cluster) Members() []*Member {
+	c.Lock()
+	defer c.Unlock()
 	var sms SortableMemberSlice
 	for _, m := range c.members {
 		sms = append(sms, m)
@@ -129,12 +134,16 @@ func (s SortableMemberSlice) Less(i, j int) bool { return s[i].ID < s[j].ID }
 func (s SortableMemberSlice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 
 func (c *Cluster) Member(id types.ID) *Member {
+	c.Lock()
+	defer c.Unlock()
 	return c.members[id]
 }
 
 // MemberByName returns a Member with the given name if exists.
 // If more than one member has the given name, it will panic.
 func (c *Cluster) MemberByName(name string) *Member {
+	c.Lock()
+	defer c.Unlock()
 	var memb *Member
 	for _, m := range c.members {
 		if m.Name == name {
@@ -147,7 +156,9 @@ func (c *Cluster) MemberByName(name string) *Member {
 	return memb
 }
 
-func (c Cluster) MemberIDs() []types.ID {
+func (c *Cluster) MemberIDs() []types.ID {
+	c.Lock()
+	defer c.Unlock()
 	var ids []types.ID
 	for _, m := range c.members {
 		ids = append(ids, m.ID)
@@ -157,13 +168,17 @@ func (c Cluster) MemberIDs() []types.ID {
 }
 
 func (c *Cluster) IsIDRemoved(id types.ID) bool {
+	c.Lock()
+	defer c.Unlock()
 	return c.removed[id]
 }
 
 // PeerURLs returns a list of all peer addresses. Each address is prefixed
 // with the scheme (currently "http://"). The returned list is sorted in
 // ascending lexicographical order.
-func (c Cluster) PeerURLs() []string {
+func (c *Cluster) PeerURLs() []string {
+	c.Lock()
+	defer c.Unlock()
 	endpoints := make([]string, 0)
 	for _, p := range c.members {
 		for _, addr := range p.PeerURLs {
@@ -177,7 +192,9 @@ func (c Cluster) PeerURLs() []string {
 // ClientURLs returns a list of all client addresses. Each address is prefixed
 // with the scheme (currently "http://"). The returned list is sorted in
 // ascending lexicographical order.
-func (c Cluster) ClientURLs() []string {
+func (c *Cluster) ClientURLs() []string {
+	c.Lock()
+	defer c.Unlock()
 	urls := make([]string, 0)
 	for _, p := range c.members {
 		for _, url := range p.ClientURLs {
@@ -188,7 +205,9 @@ func (c Cluster) ClientURLs() []string {
 	return urls
 }
 
-func (c Cluster) String() string {
+func (c *Cluster) String() string {
+	c.Lock()
+	defer c.Unlock()
 	sl := []string{}
 	for _, m := range c.members {
 		for _, u := range m.PeerURLs {
@@ -278,6 +297,8 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 // AddMember puts a new Member into the store.
 // A Member with a matching id must not exist.
 func (c *Cluster) AddMember(m *Member) {
+	c.Lock()
+	defer c.Unlock()
 	b, err := json.Marshal(m.RaftAttributes)
 	if err != nil {
 		log.Panicf("marshal raftAttributes should never fail: %v", err)
@@ -300,6 +321,8 @@ func (c *Cluster) AddMember(m *Member) {
 // RemoveMember removes a member from the store.
 // The given id MUST exist, or the function panics.
 func (c *Cluster) RemoveMember(id types.ID) {
+	c.Lock()
+	defer c.Unlock()
 	if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
 		log.Panicf("delete member should never fail: %v", err)
 	}

+ 1 - 0
etcdserver/etcdhttp/http_test.go

@@ -57,6 +57,7 @@ func (c *fakeCluster) Members() []*etcdserver.Member {
 	return []*etcdserver.Member(sms)
 }
 func (c *fakeCluster) Member(id types.ID) *etcdserver.Member { return c.members[uint64(id)] }
+func (c *fakeCluster) IsIDRemoved(id types.ID) bool          { return false }
 
 // errServer implements the etcd.Server interface for testing.
 // It returns the given error from any Do/Process/AddMember/RemoveMember calls.

+ 114 - 52
etcdserver/sender.go

@@ -28,42 +28,46 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
-const raftPrefix = "/raft"
+const (
+	raftPrefix    = "/raft"
+	connPerSender = 4
+)
+
+type sendHub struct {
+	tr      *http.Transport
+	cl      ClusterInfo
+	ss      *stats.ServerStats
+	ls      *stats.LeaderStats
+	senders map[types.ID]*sender
+}
 
-// Sender creates the default production sender used to transport raft messages
-// in the cluster. The returned sender will update the given ServerStats and
+// newSendHub creates the default send hub used to transport raft messages
+// to other members. The returned sendHub will update the given ServerStats and
 // LeaderStats appropriately.
-func Sender(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
-	c := &http.Client{Transport: t}
-
-	return func(msgs []raftpb.Message) {
-		for _, m := range msgs {
-			// TODO: reuse go routines
-			// limit the number of outgoing connections for the same receiver
-			go send(c, cl, m, ss, ls)
-		}
+func newSendHub(t *http.Transport, cl ClusterInfo, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub {
+	h := &sendHub{
+		tr:      t,
+		cl:      cl,
+		ss:      ss,
+		ls:      ls,
+		senders: make(map[types.ID]*sender),
 	}
+	for _, m := range cl.Members() {
+		h.Add(m)
+	}
+	return h
 }
 
-// send uses the given client to send a message to a member in the given
-// ClusterStore, retrying up to 3 times for each message. The given
-// ServerStats and LeaderStats are updated appropriately
-func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
-	to := types.ID(m.To)
-	cid := cl.ID()
-	// TODO (xiangli): reasonable retry logic
-	for i := 0; i < 3; i++ {
-		memb := cl.Member(to)
-		if memb == nil {
-			if !cl.IsIDRemoved(to) {
-				// TODO: unknown peer id.. what do we do? I
-				// don't think his should ever happen, need to
-				// look into this further.
-				log.Printf("etcdserver: error sending message to unknown receiver %s", to.String())
+func (h *sendHub) Send(msgs []raftpb.Message) {
+	for _, m := range msgs {
+		to := types.ID(m.To)
+		s, ok := h.senders[to]
+		if !ok {
+			if !h.cl.IsIDRemoved(to) {
+				log.Printf("etcdserver: send message to unknown receiver %s", to)
 			}
-			return
+			continue
 		}
-		u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix)
 
 		// TODO: don't block. we should be able to have 1000s
 		// of messages out at a time.
@@ -73,52 +77,110 @@ func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats,
 			return // drop bad message
 		}
 		if m.Type == raftpb.MsgApp {
-			ss.SendAppendReq(len(data))
+			h.ss.SendAppendReq(len(data))
 		}
-		fs := ls.Follower(to.String())
 
+		// TODO (xiangli): reasonable retry logic
+		s.send(data)
+	}
+}
+
+func (h *sendHub) Stop() {
+	for _, s := range h.senders {
+		s.stop()
+	}
+}
+
+func (h *sendHub) Add(m *Member) {
+	// TODO: considering how to switch between all available peer urls
+	u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix)
+	c := &http.Client{Transport: h.tr}
+	fs := h.ls.Follower(m.ID.String())
+	s := newSender(u, h.cl.ID(), c, fs)
+	h.senders[m.ID] = s
+}
+
+func (h *sendHub) Remove(id types.ID) {
+	h.senders[id].stop()
+	delete(h.senders, id)
+}
+
+type sender struct {
+	u   string
+	cid types.ID
+	c   *http.Client
+	fs  *stats.FollowerStats
+	q   chan []byte
+}
+
+func newSender(u string, cid types.ID, c *http.Client, fs *stats.FollowerStats) *sender {
+	s := &sender{
+		u:   u,
+		cid: cid,
+		c:   c,
+		fs:  fs,
+		q:   make(chan []byte),
+	}
+	for i := 0; i < connPerSender; i++ {
+		go s.handle()
+	}
+	return s
+}
+
+func (s *sender) send(data []byte) {
+	select {
+	case s.q <- data:
+	default:
+		log.Printf("sender: reach the maximal serving to %s", s.u)
+	}
+}
+
+func (s *sender) stop() {
+	close(s.q)
+}
+
+func (s *sender) handle() {
+	for d := range s.q {
 		start := time.Now()
-		sent := httpPost(c, u, cid, data)
+		err := s.post(d)
 		end := time.Now()
-		if sent {
-			fs.Succ(end.Sub(start))
-			return
+		if err != nil {
+			s.fs.Fail()
+			log.Printf("sender: %v", err)
+			continue
 		}
-		fs.Fail()
-		// TODO: backoff
+		s.fs.Succ(end.Sub(start))
 	}
 }
 
-// httpPost POSTs a data payload to a url using the given client. Returns true
-// if the POST succeeds, false on any failure.
-func httpPost(c *http.Client, url string, cid types.ID, data []byte) bool {
-	req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
+// post POSTs a data payload to a url. Returns nil if the POST succeeds,
+// error on any failure.
+func (s *sender) post(data []byte) error {
+	req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data))
 	if err != nil {
-		// TODO: log the error?
-		return false
+		return fmt.Errorf("new request to %s error: %v", s.u, err)
 	}
 	req.Header.Set("Content-Type", "application/protobuf")
-	req.Header.Set("X-Etcd-Cluster-ID", cid.String())
-	resp, err := c.Do(req)
+	req.Header.Set("X-Etcd-Cluster-ID", s.cid.String())
+	resp, err := s.c.Do(req)
 	if err != nil {
-		// TODO: log the error?
-		return false
+		return fmt.Errorf("do request %+v error: %v", req, err)
 	}
 	resp.Body.Close()
 
 	switch resp.StatusCode {
 	case http.StatusPreconditionFailed:
 		// TODO: shutdown the etcdserver gracefully?
-		log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), cid.String())
-		return false
+		log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid)
+		return nil
 	case http.StatusForbidden:
 		// TODO: stop the server
 		log.Println("etcd: this member has been permanently removed from the cluster")
 		log.Fatalln("etcd: the data-dir used by this member must be removed so that this host can be re-added with a new member ID")
-		return false
+		return nil
 	case http.StatusNoContent:
-		return true
+		return nil
 	default:
-		return false
+		return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode))
 	}
 }

+ 78 - 0
etcdserver/sender_test.go

@@ -0,0 +1,78 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package etcdserver
+
+import (
+	"testing"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/types"
+)
+
+func TestSendHubInitSenders(t *testing.T) {
+	membs := []Member{
+		newTestMember(1, []string{"http://a"}, "", nil),
+		newTestMember(2, []string{"http://b"}, "", nil),
+		newTestMember(3, []string{"http://c"}, "", nil),
+	}
+	cl := newTestCluster(membs)
+	ls := stats.NewLeaderStats("")
+	h := newSendHub(nil, cl, nil, ls)
+
+	ids := cl.MemberIDs()
+	if len(h.senders) != len(ids) {
+		t.Errorf("len(ids) = %d, want %d", len(h.senders), len(ids))
+	}
+	for _, id := range ids {
+		if _, ok := h.senders[id]; !ok {
+			t.Errorf("senders[%s] is nil, want exists", id)
+		}
+	}
+}
+
+func TestSendHubAdd(t *testing.T) {
+	cl := newTestCluster(nil)
+	ls := stats.NewLeaderStats("")
+	h := newSendHub(nil, cl, nil, ls)
+	m := newTestMemberp(1, []string{"http://a"}, "", nil)
+	h.Add(m)
+
+	if _, ok := ls.Followers["1"]; !ok {
+		t.Errorf("FollowerStats[1] is nil, want exists")
+	}
+	s, ok := h.senders[types.ID(1)]
+	if !ok {
+		t.Fatalf("senders[1] is nil, want exists")
+	}
+	if s.u != "http://a/raft" {
+		t.Errorf("url = %s, want %s", s.u, "http://a/raft")
+	}
+}
+
+func TestSendHubRemove(t *testing.T) {
+	membs := []Member{
+		newTestMember(1, []string{"http://a"}, "", nil),
+	}
+	cl := newTestCluster(membs)
+	ls := stats.NewLeaderStats("")
+	h := newSendHub(nil, cl, nil, ls)
+	h.Remove(types.ID(1))
+
+	if _, ok := h.senders[types.ID(1)]; ok {
+		t.Fatalf("senders[1] exists, want removed")
+	}
+}

+ 15 - 6
etcdserver/server.go

@@ -78,14 +78,19 @@ func init() {
 	rand.Seed(time.Now().UnixNano())
 }
 
-type sendFunc func(m []raftpb.Message)
-
 type Response struct {
 	Event   *store.Event
 	Watcher store.Watcher
 	err     error
 }
 
+type Sender interface {
+	Send(m []raftpb.Message)
+	Add(m *Member)
+	Remove(id types.ID)
+	Stop()
+}
+
 type Storage interface {
 	// Save function saves ents and state to the underlying stable storage.
 	// Save MUST block until st and ents are on stable storage.
@@ -156,11 +161,11 @@ type EtcdServer struct {
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
 
-	// send specifies the send function for sending msgs to members. send
+	// sender specifies the sender to send msgs to members. sending msgs
 	// MUST NOT block. It is okay to drop messages, since clients should
 	// timeout and reissue their messages.  If send is nil, server will
 	// panic.
-	send sendFunc
+	sender Sender
 
 	storage Storage
 
@@ -241,6 +246,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	}
 	lstats := stats.NewLeaderStats(id.String())
 
+	shub := newSendHub(cfg.Transport, cfg.Cluster, sstats, lstats)
 	s := &EtcdServer{
 		store:      st,
 		node:       n,
@@ -253,7 +259,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		}{w, ss},
 		stats:      sstats,
 		lstats:     lstats,
-		send:       Sender(cfg.Transport, cfg.Cluster, sstats, lstats),
+		sender:     shub,
 		Ticker:     time.Tick(100 * time.Millisecond),
 		SyncTicker: time.Tick(500 * time.Millisecond),
 		snapCount:  cfg.SnapCount,
@@ -318,7 +324,7 @@ func (s *EtcdServer) run() {
 			if err := s.storage.SaveSnap(rd.Snapshot); err != nil {
 				log.Fatalf("etcdserver: create snapshot error: %v", err)
 			}
-			s.send(rd.Messages)
+			s.sender.Send(rd.Messages)
 
 			// TODO(bmizerany): do this in the background, but take
 			// care to apply entries in a single goroutine, and not
@@ -361,6 +367,7 @@ func (s *EtcdServer) Stop() {
 	s.node.Stop()
 	close(s.done)
 	<-s.stopped
+	s.sender.Stop()
 }
 
 // Do interprets r and performs an operation on s.store according to r.Method
@@ -647,10 +654,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
 			log.Panicf("nodeID should always be equal to member ID")
 		}
 		s.Cluster.AddMember(m)
+		s.sender.Add(m)
 		log.Printf("etcdserver: added node %s to cluster", types.ID(cc.NodeID))
 	case raftpb.ConfChangeRemoveNode:
 		id := types.ID(cc.NodeID)
 		s.Cluster.RemoveMember(id)
+		s.sender.Remove(id)
 		log.Printf("etcdserver: removed node %s from cluster", id)
 	}
 	return nil

+ 28 - 39
etcdserver/server_test.go

@@ -487,19 +487,25 @@ func TestApplyConfChangeError(t *testing.T) {
 func TestClusterOf1(t *testing.T) { testServer(t, 1) }
 func TestClusterOf3(t *testing.T) { testServer(t, 3) }
 
+type fakeSender struct {
+	ss []*EtcdServer
+}
+
+func (s *fakeSender) Send(msgs []raftpb.Message) {
+	for _, m := range msgs {
+		s.ss[m.To-1].node.Step(context.TODO(), m)
+	}
+}
+func (s *fakeSender) Add(m *Member)      {}
+func (s *fakeSender) Remove(id types.ID) {}
+func (s *fakeSender) Stop()              {}
+
 func testServer(t *testing.T, ns uint64) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
 	ss := make([]*EtcdServer, ns)
 
-	send := func(msgs []raftpb.Message) {
-		for _, m := range msgs {
-			t.Logf("m = %+v\n", m)
-			ss[m.To-1].node.Step(ctx, m)
-		}
-	}
-
 	ids := make([]uint64, ns)
 	for i := uint64(0); i < ns; i++ {
 		ids[i] = i + 1
@@ -516,7 +522,7 @@ func testServer(t *testing.T, ns uint64) {
 		srv := &EtcdServer{
 			node:    n,
 			store:   st,
-			send:    send,
+			sender:  &fakeSender{ss},
 			storage: &storageRecorder{},
 			Ticker:  tk.C,
 			Cluster: cl,
@@ -585,7 +591,7 @@ func TestDoProposal(t *testing.T) {
 		srv := &EtcdServer{
 			node:    n,
 			store:   st,
-			send:    func(_ []raftpb.Message) {},
+			sender:  &nopSender{},
 			storage: &storageRecorder{},
 			Ticker:  tk,
 			Cluster: cl,
@@ -668,7 +674,7 @@ func TestDoProposalStopped(t *testing.T) {
 		// TODO: use fake node for better testability
 		node:    n,
 		store:   st,
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		Ticker:  tk,
 	}
@@ -778,7 +784,7 @@ func TestSyncTrigger(t *testing.T) {
 	srv := &EtcdServer{
 		node:       n,
 		store:      &storeRecorder{},
-		send:       func(_ []raftpb.Message) {},
+		sender:     &nopSender{},
 		storage:    &storageRecorder{},
 		SyncTicker: st,
 	}
@@ -852,7 +858,7 @@ func TestTriggerSnap(t *testing.T) {
 	cl.SetStore(store.New())
 	s := &EtcdServer{
 		store:     st,
-		send:      func(_ []raftpb.Message) {},
+		sender:    &nopSender{},
 		storage:   p,
 		node:      n,
 		snapCount: 10,
@@ -886,7 +892,7 @@ func TestRecvSnapshot(t *testing.T) {
 	p := &storageRecorder{}
 	s := &EtcdServer{
 		store:   st,
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: p,
 		node:    n,
 	}
@@ -914,7 +920,7 @@ func TestRecvSlowSnapshot(t *testing.T) {
 	st := &storeRecorder{}
 	s := &EtcdServer{
 		store:   st,
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		node:    n,
 	}
@@ -949,7 +955,7 @@ func TestAddMember(t *testing.T) {
 	s := &EtcdServer{
 		node:    n,
 		store:   &storeRecorder{},
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		Cluster: cl,
 	}
@@ -984,7 +990,7 @@ func TestRemoveMember(t *testing.T) {
 	s := &EtcdServer{
 		node:    n,
 		store:   &storeRecorder{},
-		send:    func(_ []raftpb.Message) {},
+		sender:  &nopSender{},
 		storage: &storageRecorder{},
 		Cluster: cl,
 	}
@@ -1052,6 +1058,7 @@ func TestPublish(t *testing.T) {
 func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
 		node:    &nodeRecorder{},
+		sender:  &nopSender{},
 		Cluster: &Cluster{},
 		w:       &waitRecorder{},
 		done:    make(chan struct{}),
@@ -1387,30 +1394,12 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
 }
 func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
 
-type clusterStoreRecorder struct {
-	recorder
-}
-
-func (cs *clusterStoreRecorder) Add(m Member) {
-	cs.record(action{name: "Add", params: []interface{}{m}})
-}
-func (cs *clusterStoreRecorder) Get() Cluster {
-	cs.record(action{name: "Get"})
-	return Cluster{}
-}
-func (cs *clusterStoreRecorder) Remove(id uint64) {
-	cs.record(action{name: "Remove", params: []interface{}{id}})
-}
-func (cs *clusterStoreRecorder) IsRemoved(id uint64) bool { return false }
-
-type removedClusterStore struct {
-	removed map[uint64]bool
-}
+type nopSender struct{}
 
-func (cs *removedClusterStore) Add(m Member)             {}
-func (cs *removedClusterStore) Get() Cluster             { return Cluster{} }
-func (cs *removedClusterStore) Remove(id uint64)         {}
-func (cs *removedClusterStore) IsRemoved(id uint64) bool { return cs.removed[id] }
+func (s *nopSender) Send(m []raftpb.Message) {}
+func (s *nopSender) Add(m *Member)           {}
+func (s *nopSender) Remove(id types.ID)      {}
+func (s *nopSender) Stop()                   {}
 
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
 	peers := make([]raft.Peer, len(ids))

+ 2 - 2
integration/cluster_test.go

@@ -36,7 +36,7 @@ import (
 )
 
 const (
-	tickDuration = 5 * time.Millisecond
+	tickDuration = 10 * time.Millisecond
 	clusterName  = "etcd"
 )
 
@@ -181,7 +181,7 @@ func (m *member) Launch(t *testing.T) {
 		t.Fatalf("failed to initialize the etcd server: %v", err)
 	}
 	m.s.Ticker = time.Tick(tickDuration)
-	m.s.SyncTicker = time.Tick(tickDuration)
+	m.s.SyncTicker = time.Tick(10 * tickDuration)
 	m.s.Start()
 
 	for _, ln := range m.PeerListeners {