瀏覽代碼

Merge pull request #1623 from xiangli-cmu/valid_configuration

Valid configuration
Xiang Li 11 年之前
父節點
當前提交
6fdbb086f4
共有 4 個文件被更改,包括 81 次插入68 次删除
  1. 54 27
      etcdserver/cluster.go
  2. 4 5
      etcdserver/cluster_test.go
  3. 5 24
      etcdserver/server.go
  4. 18 12
      etcdserver/server_test.go

+ 54 - 27
etcdserver/cluster.go

@@ -31,6 +31,7 @@ import (
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/pkg/flags"
 	"github.com/coreos/etcd/pkg/flags"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 )
 )
 
 
@@ -89,33 +90,7 @@ func NewClusterFromString(token string, cluster string) (*Cluster, error) {
 func NewClusterFromStore(token string, st store.Store) *Cluster {
 func NewClusterFromStore(token string, st store.Store) *Cluster {
 	c := newCluster(token)
 	c := newCluster(token)
 	c.store = st
 	c.store = st
-
-	e, err := c.store.Get(storeMembersPrefix, true, true)
-	if err != nil {
-		if isKeyNotFound(err) {
-			return c
-		}
-		log.Panicf("get storeMembers should never fail: %v", err)
-	}
-	for _, n := range e.Node.Nodes {
-		m, err := nodeToMember(n)
-		if err != nil {
-			log.Panicf("nodeToMember should never fail: %v", err)
-		}
-		c.members[m.ID] = m
-	}
-
-	e, err = c.store.Get(storeRemovedMembersPrefix, true, true)
-	if err != nil {
-		if isKeyNotFound(err) {
-			return c
-		}
-		log.Panicf("get storeRemovedMembers should never fail: %v", err)
-	}
-	for _, n := range e.Node.Nodes {
-		c.removed[mustParseMemberIDFromKey(n.Key)] = true
-	}
-
+	c.members, c.removed = membersFromStore(c.store)
 	return c
 	return c
 }
 }
 
 
@@ -265,6 +240,27 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
 
 
 func (c *Cluster) SetStore(st store.Store) { c.store = st }
 func (c *Cluster) SetStore(st store.Store) { c.store = st }
 
 
+func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
+	appliedMembers, appliedRemoved := membersFromStore(c.store)
+
+	if appliedRemoved[types.ID(cc.NodeID)] {
+		return ErrIDRemoved
+	}
+	switch cc.Type {
+	case raftpb.ConfChangeAddNode:
+		if appliedMembers[types.ID(cc.NodeID)] != nil {
+			return ErrIDExists
+		}
+	case raftpb.ConfChangeRemoveNode:
+		if appliedMembers[types.ID(cc.NodeID)] == nil {
+			return ErrIDNotFound
+		}
+	default:
+		log.Panicf("ConfChange type should be either AddNode or RemoveNode")
+	}
+	return nil
+}
+
 // AddMember puts a new Member into the store.
 // AddMember puts a new Member into the store.
 // A Member with a matching id must not exist.
 // A Member with a matching id must not exist.
 func (c *Cluster) AddMember(m *Member) {
 func (c *Cluster) AddMember(m *Member) {
@@ -322,6 +318,37 @@ func nodeToMember(n *store.NodeExtern) (*Member, error) {
 	return m, nil
 	return m, nil
 }
 }
 
 
+func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
+	members := make(map[types.ID]*Member)
+	removed := make(map[types.ID]bool)
+	e, err := st.Get(storeMembersPrefix, true, true)
+	if err != nil {
+		if isKeyNotFound(err) {
+			return members, removed
+		}
+		log.Panicf("get storeMembers should never fail: %v", err)
+	}
+	for _, n := range e.Node.Nodes {
+		m, err := nodeToMember(n)
+		if err != nil {
+			log.Panicf("nodeToMember should never fail: %v", err)
+		}
+		members[m.ID] = m
+	}
+
+	e, err = st.Get(storeRemovedMembersPrefix, true, true)
+	if err != nil {
+		if isKeyNotFound(err) {
+			return members, removed
+		}
+		log.Panicf("get storeRemovedMembers should never fail: %v", err)
+	}
+	for _, n := range e.Node.Nodes {
+		removed[mustParseMemberIDFromKey(n.Key)] = true
+	}
+	return members, removed
+}
+
 func isKeyNotFound(err error) bool {
 func isKeyNotFound(err error) bool {
 	e, ok := err.(*etcdErr.Error)
 	e, ok := err.(*etcdErr.Error)
 	return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
 	return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound

+ 4 - 5
etcdserver/cluster_test.go

@@ -93,13 +93,11 @@ func TestClusterFromStore(t *testing.T) {
 		},
 		},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		st := store.New()
 		hc := newTestCluster(nil)
 		hc := newTestCluster(nil)
-		hc.SetStore(st)
 		for _, m := range tt.mems {
 		for _, m := range tt.mems {
 			hc.AddMember(&m)
 			hc.AddMember(&m)
 		}
 		}
-		c := NewClusterFromStore("abc", st)
+		c := NewClusterFromStore("abc", hc.store)
 		if c.token != "abc" {
 		if c.token != "abc" {
 			t.Errorf("#%d: token = %v, want %v", i, c.token, "abc")
 			t.Errorf("#%d: token = %v, want %v", i, c.token, "abc")
 		}
 		}
@@ -535,8 +533,9 @@ func TestNodeToMember(t *testing.T) {
 
 
 func newTestCluster(membs []Member) *Cluster {
 func newTestCluster(membs []Member) *Cluster {
 	c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
 	c := &Cluster{members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
-	for i, m := range membs {
-		c.members[m.ID] = &membs[i]
+	c.store = store.New()
+	for i := range membs {
+		c.AddMember(&membs[i])
 	}
 	}
 	return c
 	return c
 }
 }

+ 5 - 24
etcdserver/server.go

@@ -328,7 +328,7 @@ func (s *EtcdServer) run() {
 			// race them.
 			// race them.
 			// TODO: apply configuration change into ClusterStore.
 			// TODO: apply configuration change into ClusterStore.
 			if len(rd.CommittedEntries) != 0 {
 			if len(rd.CommittedEntries) != 0 {
-				appliedi = s.apply(rd.CommittedEntries, nodes)
+				appliedi = s.apply(rd.CommittedEntries)
 			}
 			}
 
 
 			if rd.Snapshot.Index > snapi {
 			if rd.Snapshot.Index > snapi {
@@ -559,7 +559,7 @@ func getExpirationTime(r *pb.Request) time.Time {
 	return t
 	return t
 }
 }
 
 
-func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 {
+func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
 	var applied uint64
 	var applied uint64
 	for i := range es {
 	for i := range es {
 		e := es[i]
 		e := es[i]
@@ -571,7 +571,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, nodes []uint64) uint64 {
 		case raftpb.EntryConfChange:
 		case raftpb.EntryConfChange:
 			var cc raftpb.ConfChange
 			var cc raftpb.ConfChange
 			pbutil.MustUnmarshal(&cc, e.Data)
 			pbutil.MustUnmarshal(&cc, e.Data)
-			s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes))
+			s.w.Trigger(cc.ID, s.applyConfChange(cc))
 		default:
 		default:
 			log.Panicf("entry type should be either EntryNormal or EntryConfChange")
 			log.Panicf("entry type should be either EntryNormal or EntryConfChange")
 		}
 		}
@@ -633,8 +633,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
 	}
 	}
 }
 }
 
 
-func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error {
-	if err := s.checkConfChange(cc, nodes); err != nil {
+func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) error {
+	if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
 		cc.NodeID = raft.None
 		cc.NodeID = raft.None
 		s.node.ApplyConfChange(cc)
 		s.node.ApplyConfChange(cc)
 		return err
 		return err
@@ -659,25 +659,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error
 	return nil
 	return nil
 }
 }
 
 
-func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
-	if s.Cluster.IsIDRemoved(types.ID(cc.NodeID)) {
-		return ErrIDRemoved
-	}
-	switch cc.Type {
-	case raftpb.ConfChangeAddNode:
-		if containsUint64(nodes, cc.NodeID) {
-			return ErrIDExists
-		}
-	case raftpb.ConfChangeRemoveNode:
-		if !containsUint64(nodes, cc.NodeID) {
-			return ErrIDNotFound
-		}
-	default:
-		log.Panicf("ConfChange type should be either AddNode or RemoveNode")
-	}
-	return nil
-}
-
 // TODO: non-blocking snapshot
 // TODO: non-blocking snapshot
 func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
 func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
 	d, err := s.store.Save()
 	d, err := s.store.Save()

+ 18 - 12
etcdserver/server_test.go

@@ -421,8 +421,13 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
 
 
 // TODO: test ErrIDRemoved
 // TODO: test ErrIDRemoved
 func TestApplyConfChangeError(t *testing.T) {
 func TestApplyConfChangeError(t *testing.T) {
-	nodes := []uint64{1, 2, 3}
-	removed := map[types.ID]bool{4: true}
+	cl := newCluster("")
+	cl.SetStore(store.New())
+	for i := 1; i <= 4; i++ {
+		cl.AddMember(&Member{ID: types.ID(i)})
+	}
+	cl.RemoveMember(4)
+
 	tests := []struct {
 	tests := []struct {
 		cc   raftpb.ConfChange
 		cc   raftpb.ConfChange
 		werr error
 		werr error
@@ -458,12 +463,11 @@ func TestApplyConfChangeError(t *testing.T) {
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		n := &nodeRecorder{}
 		n := &nodeRecorder{}
-		cl := &Cluster{removed: removed}
 		srv := &EtcdServer{
 		srv := &EtcdServer{
 			node:    n,
 			node:    n,
 			Cluster: cl,
 			Cluster: cl,
 		}
 		}
-		err := srv.applyConfChange(tt.cc, nodes)
+		err := srv.applyConfChange(tt.cc)
 		if err != tt.werr {
 		if err != tt.werr {
 			t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
 			t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
 		}
 		}
@@ -506,11 +510,12 @@ func testServer(t *testing.T, ns uint64) {
 		n := raft.StartNode(id, members, 10, 1)
 		n := raft.StartNode(id, members, 10, 1)
 		tk := time.NewTicker(10 * time.Millisecond)
 		tk := time.NewTicker(10 * time.Millisecond)
 		defer tk.Stop()
 		defer tk.Stop()
+		st := store.New()
 		cl := newCluster("abc")
 		cl := newCluster("abc")
-		cl.SetStore(&storeRecorder{})
+		cl.SetStore(st)
 		srv := &EtcdServer{
 		srv := &EtcdServer{
 			node:    n,
 			node:    n,
-			store:   store.New(),
+			store:   st,
 			send:    send,
 			send:    send,
 			storage: &storageRecorder{},
 			storage: &storageRecorder{},
 			Ticker:  tk.C,
 			Ticker:  tk.C,
@@ -536,8 +541,8 @@ func testServer(t *testing.T, ns uint64) {
 
 
 		g, w := resp.Event.Node, &store.NodeExtern{
 		g, w := resp.Event.Node, &store.NodeExtern{
 			Key:           "/foo",
 			Key:           "/foo",
-			ModifiedIndex: uint64(i),
-			CreatedIndex:  uint64(i),
+			ModifiedIndex: uint64(i) + 2*ns,
+			CreatedIndex:  uint64(i) + 2*ns,
 			Value:         stringp("bar"),
 			Value:         stringp("bar"),
 		}
 		}
 
 
@@ -576,7 +581,7 @@ func TestDoProposal(t *testing.T) {
 		// this makes <-tk always successful, which accelerates internal clock
 		// this makes <-tk always successful, which accelerates internal clock
 		close(tk)
 		close(tk)
 		cl := newCluster("abc")
 		cl := newCluster("abc")
-		cl.SetStore(&storeRecorder{})
+		cl.SetStore(store.New())
 		srv := &EtcdServer{
 		srv := &EtcdServer{
 			node:    n,
 			node:    n,
 			store:   st,
 			store:   st,
@@ -833,13 +838,15 @@ func TestTriggerSnap(t *testing.T) {
 	n.Campaign(ctx)
 	n.Campaign(ctx)
 	st := &storeRecorder{}
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	p := &storageRecorder{}
+	cl := newCluster("abc")
+	cl.SetStore(store.New())
 	s := &EtcdServer{
 	s := &EtcdServer{
 		store:     st,
 		store:     st,
 		send:      func(_ []raftpb.Message) {},
 		send:      func(_ []raftpb.Message) {},
 		storage:   p,
 		storage:   p,
 		node:      n,
 		node:      n,
 		snapCount: 10,
 		snapCount: 10,
-		Cluster:   &Cluster{},
+		Cluster:   cl,
 	}
 	}
 
 
 	s.start()
 	s.start()
@@ -928,7 +935,7 @@ func TestAddMember(t *testing.T) {
 		},
 		},
 	}
 	}
 	cl := newTestCluster(nil)
 	cl := newTestCluster(nil)
-	cl.SetStore(&storeRecorder{})
+	cl.SetStore(store.New())
 	s := &EtcdServer{
 	s := &EtcdServer{
 		node:    n,
 		node:    n,
 		store:   &storeRecorder{},
 		store:   &storeRecorder{},
@@ -964,7 +971,6 @@ func TestRemoveMember(t *testing.T) {
 		},
 		},
 	}
 	}
 	cl := newTestCluster([]Member{{ID: 1234}})
 	cl := newTestCluster([]Member{{ID: 1234}})
-	cl.SetStore(&storeRecorder{})
 	s := &EtcdServer{
 	s := &EtcdServer{
 		node:    n,
 		node:    n,
 		store:   &storeRecorder{},
 		store:   &storeRecorder{},