Browse Source

Merge pull request #2290 from xiang90/fix_transport

etcdserver: recover transport when recovering from a snapshot
Xiang Li 11 years ago
parent
commit
b31109cfd7
3 changed files with 25 additions and 0 deletions
  1. 10 0
      etcdserver/server.go
  2. 1 0
      etcdserver/server_test.go
  3. 14 0
      rafthttp/transport.go

+ 10 - 0
etcdserver/server.go

@@ -392,6 +392,16 @@ func (s *EtcdServer) run() {
 					log.Panicf("recovery store error: %v", err)
 					log.Panicf("recovery store error: %v", err)
 				}
 				}
 				s.Cluster.Recover()
 				s.Cluster.Recover()
+
+				// recover raft transport
+				s.r.transport.RemoveAllPeers()
+				for _, m := range s.Cluster.Members() {
+					if m.ID == s.ID() {
+						continue
+					}
+					s.r.transport.AddPeer(m.ID, m.PeerURLs)
+				}
+
 				appliedi = rd.Snapshot.Metadata.Index
 				appliedi = rd.Snapshot.Metadata.Index
 				confState = rd.Snapshot.Metadata.ConfState
 				confState = rd.Snapshot.Metadata.ConfState
 				log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
 				log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)

+ 1 - 0
etcdserver/server_test.go

@@ -1393,6 +1393,7 @@ func (s *nopTransporter) Handler() http.Handler               { return nil }
 func (s *nopTransporter) Send(m []raftpb.Message)             {}
 func (s *nopTransporter) Send(m []raftpb.Message)             {}
 func (s *nopTransporter) AddPeer(id types.ID, us []string)    {}
 func (s *nopTransporter) AddPeer(id types.ID, us []string)    {}
 func (s *nopTransporter) RemovePeer(id types.ID)              {}
 func (s *nopTransporter) RemovePeer(id types.ID)              {}
+func (s *nopTransporter) RemoveAllPeers()                     {}
 func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
 func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
 func (s *nopTransporter) Stop()                               {}
 func (s *nopTransporter) Stop()                               {}
 func (s *nopTransporter) Pause()                              {}
 func (s *nopTransporter) Pause()                              {}

+ 14 - 0
rafthttp/transport.go

@@ -37,6 +37,7 @@ type Transporter interface {
 	Send(m []raftpb.Message)
 	Send(m []raftpb.Message)
 	AddPeer(id types.ID, urls []string)
 	AddPeer(id types.ID, urls []string)
 	RemovePeer(id types.ID)
 	RemovePeer(id types.ID)
+	RemoveAllPeers()
 	UpdatePeer(id types.ID, urls []string)
 	UpdatePeer(id types.ID, urls []string)
 	Stop()
 	Stop()
 }
 }
@@ -132,6 +133,19 @@ func (t *transport) AddPeer(id types.ID, urls []string) {
 func (t *transport) RemovePeer(id types.ID) {
 func (t *transport) RemovePeer(id types.ID) {
 	t.mu.Lock()
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	defer t.mu.Unlock()
+	t.removePeer(id)
+}
+
+func (t *transport) RemoveAllPeers() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	for id, _ := range t.peers {
+		t.removePeer(id)
+	}
+}
+
+// the caller of this function must have the peers mutex.
+func (t *transport) removePeer(id types.ID) {
 	if peer, ok := t.peers[id]; ok {
 	if peer, ok := t.peers[id]; ok {
 		peer.Stop()
 		peer.Stop()
 	} else {
 	} else {