Browse Source

etcd: make retry configurable

Xiang Li 11 years ago
parent
commit
e9a22d0f34
3 changed files with 22 additions and 10 deletions
  1. 1 1
      etcd/etcd.go
  2. 3 0
      etcd/etcd_test.go
  3. 18 9
      etcd/participant.go

+ 1 - 1
etcd/etcd.go

@@ -175,7 +175,7 @@ func (s *Server) Run() error {
 	for {
 		switch next {
 		case participantMode:
-			p, err := newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.cfg.DataDir, s.client, s.peerHub, s.tickDuration)
+			p, err := newParticipant(s.id, s.cfg, s.client, s.peerHub, s.tickDuration)
 			if err != nil {
 				log.Printf("id=%x server.run newParicipanteErr=\"%v\"\n", s.id, err)
 				exit = err

+ 3 - 0
etcd/etcd_test.go

@@ -471,6 +471,8 @@ func (s *testServer) Start() {
 
 	e.pubAddr = s.URL
 	e.raftPubAddr = s.URL
+	e.cfg.Addr = s.URL
+	e.cfg.Peer.Addr = s.URL
 	go e.Run()
 }
 
@@ -547,6 +549,7 @@ func newTestConfig() *conf.Config {
 	c.Peer.Addr = "127.0.0.1:0"
 	c.Peer.HeartbeatInterval = 5
 	c.Peer.ElectionTimeout = 25
+	c.RetryInterval = 1 / 10.0
 	dataDir, err := ioutil.TempDir(os.TempDir(), "etcd")
 	if err != nil {
 		panic(err)

+ 18 - 9
etcd/participant.go

@@ -26,6 +26,7 @@ import (
 	"path"
 	"time"
 
+	"github.com/coreos/etcd/conf"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/store"
@@ -64,6 +65,7 @@ var (
 type participant struct {
 	id           int64
 	clusterId    int64
+	cfg          *conf.Config
 	pubAddr      string
 	raftPubAddr  string
 	tickDuration time.Duration
@@ -85,9 +87,10 @@ type participant struct {
 	*http.ServeMux
 }
 
-func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) {
+func newParticipant(id int64, c *conf.Config, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) {
 	p := &participant{
 		clusterId:    -1,
+		cfg:          c,
 		tickDuration: tickDuration,
 
 		client:  client,
@@ -109,7 +112,7 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 	p.rh = newRaftHandler(peerHub, p.Store.Version(), p.serverStats)
 	p.peerHub.setServerStats(p.serverStats)
 
-	walPath := path.Join(dir, "wal")
+	walPath := path.Join(p.cfg.DataDir, "wal")
 	w, err := wal.Open(walPath)
 	if err != nil {
 		if !os.IsNotExist(err) {
@@ -117,8 +120,8 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, cl
 		}
 
 		p.id = id
-		p.pubAddr = pubAddr
-		p.raftPubAddr = raftPubAddr
+		p.pubAddr = c.Addr
+		p.raftPubAddr = c.Peer.Addr
 		if w, err = wal.New(walPath); err != nil {
 			return nil, err
 		}
@@ -168,7 +171,9 @@ func (p *participant) run(stop chan struct{}) {
 			p.apply(p.node.Next())
 		} else {
 			log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds)
-			p.join()
+			if err := p.join(); err != nil {
+				log.Fatalf("id=%x participant.join err=%q", p.id, err)
+			}
 		}
 	}
 
@@ -411,7 +416,7 @@ func (p *participant) send(msgs []raft.Message) {
 	}
 }
 
-func (p *participant) join() {
+func (p *participant) join() error {
 	info := &context{
 		MinVersion: store.MinVersion(),
 		MaxVersion: store.MaxVersion(),
@@ -419,15 +424,19 @@ func (p *participant) join() {
 		PeerURL:    p.raftPubAddr,
 	}
 
-	for {
+	max := p.cfg.MaxRetryAttempts
+	for attempt := 0; ; attempt++ {
 		for seed := range p.peerHub.getSeeds() {
 			if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil {
-				return
+				return nil
 			} else {
 				log.Printf("id=%x participant.join addMachineErr=\"%v\"\n", p.id, err)
 			}
 		}
-		time.Sleep(100 * time.Millisecond)
+		if attempt == max {
+			return fmt.Errorf("etcd: cannot join cluster after %d attempts", max)
+		}
+		time.Sleep(time.Millisecond * time.Duration(p.cfg.RetryInterval*1000))
 	}
 }