Browse Source

etcdserver: not apply stale conf change in cluster and transport

Yicheng Qin 10 years ago
parent
commit
40197f0698
5 changed files with 110 additions and 49 deletions
  1. 48 7
      etcdserver/cluster.go
  2. 16 8
      etcdserver/cluster_test.go
  3. 8 24
      etcdserver/server.go
  4. 27 10
      etcdserver/server_test.go
  5. 11 0
      rafthttp/transport.go

+ 48 - 7
etcdserver/cluster.go

@@ -30,6 +30,7 @@ import (
 	"github.com/coreos/etcd/pkg/netutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/store"
 )
 
@@ -66,6 +67,11 @@ type Cluster struct {
 	// TODO: upgrade it as last modified index
 	index uint64
 
+	// transport and members maintains the view of the cluster at index.
+	// This might be more up to date than what stores in the store since
+	// the index may be higher than store index, which may happen when the
+	// cluster is updated from remote cluster info.
+	transport  rafthttp.Transporter
 	sync.Mutex // guards members and removed map
 	members    map[types.ID]*Member
 	// removed contains the ids of removed members in the cluster.
@@ -240,6 +246,19 @@ func (c *Cluster) UpdateIndex(index uint64) { c.index = index }
 
 func (c *Cluster) Recover() {
 	c.members, c.removed = membersFromStore(c.store)
+	// recover transport
+	c.transport.RemoveAllPeers()
+	for _, m := range c.Members() {
+		c.transport.AddPeer(m.ID, m.PeerURLs)
+	}
+}
+
+func (c *Cluster) SetTransport(tr rafthttp.Transporter) {
+	c.transport = tr
+	// add all the remote members into transport
+	for _, m := range c.Members() {
+		c.transport.AddPeer(m.ID, m.PeerURLs)
+	}
 }
 
 // ValidateConfigurationChange takes a proposed ConfChange and
@@ -305,7 +324,8 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 // AddMember adds a new Member into the cluster, and saves the given member's
 // raftAttributes into the store. The given member should have empty attributes.
 // A Member with a matching id must not exist.
-func (c *Cluster) AddMember(m *Member) {
+// The given index indicates when the event happens.
+func (c *Cluster) AddMember(m *Member, index uint64) {
 	c.Lock()
 	defer c.Unlock()
 	b, err := json.Marshal(m.RaftAttributes)
@@ -316,22 +336,37 @@ func (c *Cluster) AddMember(m *Member) {
 	if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
 		log.Panicf("create raftAttributes should never fail: %v", err)
 	}
-	c.members[m.ID] = m
+	if index > c.index {
+		// TODO: check member does not exist in the cluster
+		// New bootstrapped member has initial cluster, which contains unadded
+		// peers.
+		c.members[m.ID] = m
+		c.transport.AddPeer(m.ID, m.PeerURLs)
+		c.index = index
+	}
 }
 
 // RemoveMember removes a member from the store.
 // The given id MUST exist, or the function panics.
-func (c *Cluster) RemoveMember(id types.ID) {
+// The given index indicates when the event happens.
+func (c *Cluster) RemoveMember(id types.ID, index uint64) {
 	c.Lock()
 	defer c.Unlock()
 	if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
 		log.Panicf("delete member should never fail: %v", err)
 	}
-	delete(c.members, id)
 	if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
 		log.Panicf("create removedMember should never fail: %v", err)
 	}
-	c.removed[id] = true
+	if index > c.index {
+		if _, ok := c.members[id]; !ok {
+			log.Panicf("member %s should exist in the cluster", id)
+		}
+		delete(c.members, id)
+		c.removed[id] = true
+		c.transport.RemovePeer(id)
+		c.index = index
+	}
 }
 
 func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
@@ -341,7 +376,9 @@ func (c *Cluster) UpdateAttributes(id types.ID, attr Attributes) {
 	// TODO: update store in this function
 }
 
-func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
+// UpdateRaftAttributes updates the raft attributes of the given id.
+// The given index indicates when the event happens.
+func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, index uint64) {
 	c.Lock()
 	defer c.Unlock()
 	b, err := json.Marshal(raftAttr)
@@ -352,7 +389,11 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
 	if _, err := c.store.Update(p, string(b), store.Permanent); err != nil {
 		log.Panicf("update raftAttributes should never fail: %v", err)
 	}
-	c.members[id].RaftAttributes = raftAttr
+	if index > c.index {
+		c.members[id].RaftAttributes = raftAttr
+		c.transport.UpdatePeer(id, raftAttr.PeerURLs)
+		c.index = index
+	}
 }
 
 // Validate ensures that there is no identical urls in the cluster peer list

+ 16 - 8
etcdserver/cluster_test.go

@@ -96,8 +96,9 @@ func TestClusterFromStore(t *testing.T) {
 	for i, tt := range tests {
 		hc := newTestCluster(nil)
 		hc.SetStore(store.New())
-		for _, m := range tt.mems {
-			hc.AddMember(m)
+		hc.SetTransport(&nopTransporter{})
+		for j, m := range tt.mems {
+			hc.AddMember(m, uint64(j))
 		}
 		c := NewClusterFromStore("abc", hc.store)
 		if c.token != "abc" {
@@ -357,11 +358,12 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
 func TestClusterValidateConfigurationChange(t *testing.T) {
 	cl := newCluster("")
 	cl.SetStore(store.New())
+	cl.SetTransport(&nopTransporter{})
 	for i := 1; i <= 4; i++ {
 		attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
-		cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr})
+		cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}, uint64(i))
 	}
-	cl.RemoveMember(4)
+	cl.RemoveMember(4, 5)
 
 	attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
 	ctx, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
@@ -489,7 +491,8 @@ func TestClusterGenID(t *testing.T) {
 	previd := cs.ID()
 
 	cs.SetStore(&storeRecorder{})
-	cs.AddMember(newTestMember(3, nil, "", nil))
+	cs.SetTransport(&nopTransporter{})
+	cs.AddMember(newTestMember(3, nil, "", nil), 1)
 	cs.genID()
 	if cs.ID() == previd {
 		t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
@@ -532,7 +535,8 @@ func TestClusterAddMember(t *testing.T) {
 	st := &storeRecorder{}
 	c := newTestCluster(nil)
 	c.SetStore(st)
-	c.AddMember(newTestMember(1, nil, "node1", nil))
+	c.SetTransport(&nopTransporter{})
+	c.AddMember(newTestMember(1, nil, "node1", nil), 1)
 
 	wactions := []testutil.Action{
 		{
@@ -617,10 +621,14 @@ func TestClusterString(t *testing.T) {
 }
 
 func TestClusterRemoveMember(t *testing.T) {
-	st := &storeRecorder{}
 	c := newTestCluster(nil)
+	c.SetStore(&storeRecorder{})
+	c.SetTransport(&nopTransporter{})
+	c.AddMember(newTestMember(1, nil, "", nil), 1)
+
+	st := &storeRecorder{}
 	c.SetStore(st)
-	c.RemoveMember(1)
+	c.RemoveMember(1, 2)
 
 	wactions := []testutil.Action{
 		{Name: "Delete", Params: []interface{}{memberStoreKey(1), true, true}},

+ 8 - 24
etcdserver/server.go

@@ -258,13 +258,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	}
 
 	tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
-	// add all the remote members into sendhub
-	for _, m := range cfg.Cluster.Members() {
-		if m.ID != id {
-			tr.AddPeer(m.ID, m.PeerURLs)
-		}
-	}
 	srv.r.transport = tr
+	srv.Cluster.SetTransport(tr)
 	return srv, nil
 }
 
@@ -366,14 +361,6 @@ func (s *EtcdServer) run() {
 				// transport setting, which may block the communication.
 				if s.Cluster.index < apply.snapshot.Metadata.Index {
 					s.Cluster.Recover()
-					// recover raft transport
-					s.r.transport.RemoveAllPeers()
-					for _, m := range s.Cluster.Members() {
-						if m.ID == s.ID() {
-							continue
-						}
-						s.r.transport.AddPeer(m.ID, m.PeerURLs)
-					}
 				}
 
 				appliedi = apply.snapshot.Metadata.Index
@@ -671,7 +658,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 		case raftpb.EntryConfChange:
 			var cc raftpb.ConfChange
 			pbutil.MustUnmarshal(&cc, e.Data)
-			shouldstop, err = s.applyConfChange(cc, confState)
+			shouldstop, err = s.applyConfChange(cc, confState, e.Index)
 			s.w.Trigger(cc.ID, err)
 		default:
 			log.Panicf("entry type should be either EntryNormal or EntryConfChange")
@@ -732,9 +719,9 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
 	}
 }
 
-// applyConfChange applies a ConfChange to the server. It is only
-// invoked with a ConfChange that has already passed through Raft
-func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
+// applyConfChange applies a ConfChange to the server at the given index. It is only
+// invoked with a ConfChange that has already passed through Raft.
+func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, index uint64) (bool, error) {
 	if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
 		cc.NodeID = raft.None
 		s.r.ApplyConfChange(cc)
@@ -750,20 +737,18 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 		if cc.NodeID != uint64(m.ID) {
 			log.Panicf("nodeID should always be equal to member ID")
 		}
-		s.Cluster.AddMember(m)
+		s.Cluster.AddMember(m, index)
 		if m.ID == s.id {
 			log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
-			s.r.transport.AddPeer(m.ID, m.PeerURLs)
 			log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 	case raftpb.ConfChangeRemoveNode:
 		id := types.ID(cc.NodeID)
-		s.Cluster.RemoveMember(id)
+		s.Cluster.RemoveMember(id, index)
 		if id == s.id {
 			return true, nil
 		} else {
-			s.r.transport.RemovePeer(id)
 			log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
 		}
 	case raftpb.ConfChangeUpdateNode:
@@ -774,11 +759,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 		if cc.NodeID != uint64(m.ID) {
 			log.Panicf("nodeID should always be equal to member ID")
 		}
-		s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
+		s.Cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, index)
 		if m.ID == s.id {
 			log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		} else {
-			s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
 			log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
 		}
 	}

+ 27 - 10
etcdserver/server_test.go

@@ -413,10 +413,11 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
 func TestApplyConfChangeError(t *testing.T) {
 	cl := newCluster("")
 	cl.SetStore(store.New())
+	cl.SetTransport(&nopTransporter{})
 	for i := 1; i <= 4; i++ {
-		cl.AddMember(&Member{ID: types.ID(i)})
+		cl.AddMember(&Member{ID: types.ID(i)}, uint64(i))
 	}
-	cl.RemoveMember(4)
+	cl.RemoveMember(4, 5)
 
 	tests := []struct {
 		cc   raftpb.ConfChange
@@ -457,7 +458,7 @@ func TestApplyConfChangeError(t *testing.T) {
 			r:       raftNode{Node: n},
 			Cluster: cl,
 		}
-		_, err := srv.applyConfChange(tt.cc, nil)
+		_, err := srv.applyConfChange(tt.cc, nil, 10)
 		if err != tt.werr {
 			t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
 		}
@@ -477,8 +478,9 @@ func TestApplyConfChangeError(t *testing.T) {
 func TestApplyConfChangeShouldStop(t *testing.T) {
 	cl := newCluster("")
 	cl.SetStore(store.New())
+	cl.SetTransport(&nopTransporter{})
 	for i := 1; i <= 3; i++ {
-		cl.AddMember(&Member{ID: types.ID(i)})
+		cl.AddMember(&Member{ID: types.ID(i)}, uint64(i))
 	}
 	srv := &EtcdServer{
 		id: 1,
@@ -493,7 +495,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
 		NodeID: 2,
 	}
 	// remove non-local member
-	shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{})
+	shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}, 10)
 	if err != nil {
 		t.Fatalf("unexpected error %v", err)
 	}
@@ -503,7 +505,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
 
 	// remove local member
 	cc.NodeID = 1
-	shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{})
+	shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}, 10)
 	if err != nil {
 		t.Fatalf("unexpected error %v", err)
 	}
@@ -774,6 +776,7 @@ func TestRecvSnapshot(t *testing.T) {
 	p := &storageRecorder{}
 	cl := newCluster("abc")
 	cl.SetStore(store.New())
+	cl.SetTransport(&nopTransporter{})
 	s := &EtcdServer{
 		r: raftNode{
 			Node:        n,
@@ -808,6 +811,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
 	st := &storeRecorder{}
 	cl := newCluster("abc")
 	cl.SetStore(store.New())
+	cl.SetTransport(&nopTransporter{})
 	storage := raft.NewMemoryStorage()
 	s := &EtcdServer{
 		r: raftNode{
@@ -853,6 +857,7 @@ func TestAddMember(t *testing.T) {
 	cl := newTestCluster(nil)
 	st := store.New()
 	cl.SetStore(st)
+	cl.SetTransport(&nopTransporter{})
 	s := &EtcdServer{
 		r: raftNode{
 			Node:        n,
@@ -891,7 +896,7 @@ func TestRemoveMember(t *testing.T) {
 	cl := newTestCluster(nil)
 	st := store.New()
 	cl.SetStore(store.New())
-	cl.AddMember(&Member{ID: 1234})
+	cl.SetTransport(&nopTransporter{})
 	s := &EtcdServer{
 		r: raftNode{
 			Node:        n,
@@ -904,6 +909,7 @@ func TestRemoveMember(t *testing.T) {
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	s.start()
+	s.AddMember(context.TODO(), Member{ID: 1234})
 	err := s.RemoveMember(context.TODO(), 1234)
 	gaction := n.Action()
 	s.Stop()
@@ -911,7 +917,12 @@ func TestRemoveMember(t *testing.T) {
 	if err != nil {
 		t.Fatalf("RemoveMember error: %v", err)
 	}
-	wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}}
+	wactions := []testutil.Action{
+		{Name: "ProposeConfChange:ConfChangeAddNode"},
+		{Name: "ApplyConfChange:ConfChangeAddNode"},
+		{Name: "ProposeConfChange:ConfChangeRemoveNode"},
+		{Name: "ApplyConfChange:ConfChangeRemoveNode"},
+	}
 	if !reflect.DeepEqual(gaction, wactions) {
 		t.Errorf("action = %v, want %v", gaction, wactions)
 	}
@@ -929,7 +940,7 @@ func TestUpdateMember(t *testing.T) {
 	cl := newTestCluster(nil)
 	st := store.New()
 	cl.SetStore(st)
-	cl.AddMember(&Member{ID: 1234})
+	cl.SetTransport(&nopTransporter{})
 	s := &EtcdServer{
 		r: raftNode{
 			Node:        n,
@@ -942,6 +953,7 @@ func TestUpdateMember(t *testing.T) {
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
 	}
 	s.start()
+	s.AddMember(context.TODO(), Member{ID: 1234})
 	wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
 	err := s.UpdateMember(context.TODO(), wm)
 	gaction := n.Action()
@@ -950,7 +962,12 @@ func TestUpdateMember(t *testing.T) {
 	if err != nil {
 		t.Fatalf("UpdateMember error: %v", err)
 	}
-	wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}}
+	wactions := []testutil.Action{
+		{Name: "ProposeConfChange:ConfChangeAddNode"},
+		{Name: "ApplyConfChange:ConfChangeAddNode"},
+		{Name: "ProposeConfChange:ConfChangeUpdateNode"},
+		{Name: "ApplyConfChange:ConfChangeUpdateNode"},
+	}
 	if !reflect.DeepEqual(gaction, wactions) {
 		t.Errorf("action = %v, want %v", gaction, wactions)
 	}

+ 11 - 0
rafthttp/transport.go

@@ -136,6 +136,11 @@ func (t *transport) Stop() {
 func (t *transport) AddPeer(id types.ID, us []string) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
+	// There is no need to build connection to itself because local message
+	// is not sent through transport.
+	if id == t.id {
+		return
+	}
 	if _, ok := t.peers[id]; ok {
 		return
 	}
@@ -150,6 +155,9 @@ func (t *transport) AddPeer(id types.ID, us []string) {
 func (t *transport) RemovePeer(id types.ID) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
+	if id == t.id {
+		return
+	}
 	t.removePeer(id)
 }
 
@@ -175,6 +183,9 @@ func (t *transport) removePeer(id types.ID) {
 func (t *transport) UpdatePeer(id types.ID, us []string) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
+	if id == t.id {
+		return
+	}
 	// TODO: return error or just panic?
 	if _, ok := t.peers[id]; !ok {
 		return