Browse Source

Merge pull request #1343 from unihorn/175

etcdserver: record removed member to check incoming message
Yicheng Qin 11 years ago
parent
commit
6b32395637

+ 30 - 5
etcdserver/cluster_store.go

@@ -43,6 +43,7 @@ type ClusterStore interface {
 	Add(m Member)
 	Get() Cluster
 	Remove(id uint64)
+	IsRemoved(id uint64) bool
 }
 
 type clusterStore struct {
@@ -59,7 +60,7 @@ func (s *clusterStore) Add(m Member) {
 	if err != nil {
 		log.Panicf("marshal error: %v", err)
 	}
-	if _, err := s.Store.Create(m.storeKey()+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil {
+	if _, err := s.Store.Create(memberStoreKey(m.ID)+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil {
 		log.Panicf("add raftAttributes should never fail: %v", err)
 	}
 
@@ -67,7 +68,7 @@ func (s *clusterStore) Add(m Member) {
 	if err != nil {
 		log.Panicf("marshal error: %v", err)
 	}
-	if _, err := s.Store.Create(m.storeKey()+attributesSuffix, false, string(b), false, store.Permanent); err != nil {
+	if _, err := s.Store.Create(memberStoreKey(m.ID)+attributesSuffix, false, string(b), false, store.Permanent); err != nil {
 		log.Panicf("add attributes should never fail: %v", err)
 	}
 }
@@ -79,7 +80,7 @@ func (s *clusterStore) Get() Cluster {
 	c.id = s.id
 	e, err := s.Store.Get(storeMembersPrefix, true, true)
 	if err != nil {
-		if v, ok := err.(*etcdErr.Error); ok && v.ErrorCode == etcdErr.EcodeKeyNotFound {
+		if isKeyNotFound(err) {
 			return *c
 		}
 		log.Panicf("get member should never fail: %v", err)
@@ -121,10 +122,25 @@ func nodeToMember(n *store.NodeExtern) (Member, error) {
 // Remove removes a member from the store.
 // The given id MUST exist.
 func (s *clusterStore) Remove(id uint64) {
-	p := s.Get().FindID(id).storeKey()
-	if _, err := s.Store.Delete(p, true, true); err != nil {
+	if _, err := s.Store.Delete(memberStoreKey(id), true, true); err != nil {
 		log.Panicf("delete peer should never fail: %v", err)
 	}
+	if _, err := s.Store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
+		log.Panicf("creating RemovedMember should never fail: %v", err)
+	}
+}
+
+func (s *clusterStore) IsRemoved(id uint64) bool {
+	_, err := s.Store.Get(removedMemberStoreKey(id), false, false)
+	switch {
+	case err == nil:
+		return true
+	case isKeyNotFound(err):
+		return false
+	default:
+		log.Panicf("unexpected error when getting removed member %x: %v", id, err)
+		return false
+	}
 }
 
 // Sender creates the default production sender used to transport raft messages
@@ -206,9 +222,18 @@ func httpPost(c *http.Client, url string, cid uint64, data []byte) bool {
 		// TODO: shutdown the etcdserver gracefully?
 		log.Panicf("clusterID mismatch")
 		return false
+	case http.StatusForbidden:
+		// TODO: stop the server
+		log.Panicf("the member has been removed")
+		return false
 	case http.StatusNoContent:
 		return true
 	default:
 		return false
 	}
 }
+
+func isKeyNotFound(err error) bool {
+	e, ok := err.(*etcdErr.Error)
+	return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
+}

+ 30 - 20
etcdserver/cluster_store_test.go

@@ -108,15 +108,39 @@ func TestClusterStoreGet(t *testing.T) {
 	}
 }
 
-func TestClusterStoreDelete(t *testing.T) {
-	st := newStoreGetAllAndDeleteRecorder()
+func TestClusterStoreRemove(t *testing.T) {
+	st := &storeRecorder{}
 	cs := &clusterStore{Store: st}
-	cs.Add(newTestMember(1, nil, "node1", nil))
 	cs.Remove(1)
 
-	wdeletes := []string{path.Join(storeMembersPrefix, "1")}
-	if !reflect.DeepEqual(st.deletes, wdeletes) {
-		t.Errorf("deletes = %v, want %v", st.deletes, wdeletes)
+	wactions := []action{
+		{name: "Delete", params: []interface{}{memberStoreKey(1), true, true}},
+		{name: "Create", params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}},
+	}
+	if !reflect.DeepEqual(st.Action(), wactions) {
+		t.Errorf("actions = %v, want %v", st.Action(), wactions)
+	}
+}
+
+func TestClusterStoreIsRemovedFalse(t *testing.T) {
+	st := &errStoreRecorder{err: etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)}
+	cs := clusterStore{Store: st}
+	if ok := cs.IsRemoved(1); ok != false {
+		t.Errorf("IsRemoved = %v, want %v", ok, false)
+	}
+}
+
+func TestClusterStoreIsRemovedTrue(t *testing.T) {
+	st := &storeRecorder{}
+	cs := &clusterStore{Store: st}
+	if ok := cs.IsRemoved(1); ok != true {
+		t.Errorf("IsRemoved = %v, want %v", ok, true)
+	}
+	wactions := []action{
+		{name: "Get", params: []interface{}{removedMemberStoreKey(1), false, false}},
+	}
+	if !reflect.DeepEqual(st.Action(), wactions) {
+		t.Errorf("actions = %v, want %v", st.Action(), wactions)
 	}
 }
 
@@ -201,20 +225,6 @@ func newGetAllStore() *getAllStore {
 	return &getAllStore{store.New()}
 }
 
-type storeGetAllAndDeleteRecorder struct {
-	*getAllStore
-	deletes []string
-}
-
-func newStoreGetAllAndDeleteRecorder() *storeGetAllAndDeleteRecorder {
-	return &storeGetAllAndDeleteRecorder{getAllStore: newGetAllStore()}
-}
-
-func (s *storeGetAllAndDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) {
-	s.deletes = append(s.deletes, key)
-	return nil, nil
-}
-
 func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member {
 	return Member{
 		ID:             id,

+ 9 - 4
etcdserver/etcdhttp/http.go

@@ -238,14 +238,19 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
-	if m.Type == raftpb.MsgApp {
-		h.stats.UpdateRecvApp(m.From, r.ContentLength)
-	}
 	if err := h.server.Process(context.TODO(), m); err != nil {
 		log.Println("etcdhttp: error processing raft message:", err)
-		writeError(w, err)
+		switch err {
+		case etcdserver.ErrRemoved:
+			http.Error(w, "cannot process message from removed node", http.StatusForbidden)
+		default:
+			writeError(w, err)
+		}
 		return
 	}
+	if m.Type == raftpb.MsgApp {
+		h.stats.UpdateRecvApp(m.From, r.ContentLength)
+	}
 	w.WriteHeader(http.StatusNoContent)
 }
 

+ 16 - 1
etcdserver/etcdhttp/http_test.go

@@ -922,7 +922,7 @@ func TestServeRaft(t *testing.T) {
 			http.StatusBadRequest,
 		},
 		{
-			// good request, etcdserver.Server error
+			// good request, etcdserver.Server internal error
 			"POST",
 			bytes.NewReader(
 				mustMarshalMsg(
@@ -934,6 +934,19 @@ func TestServeRaft(t *testing.T) {
 			"0",
 			http.StatusInternalServerError,
 		},
+		{
+			// good request from removed member
+			"POST",
+			bytes.NewReader(
+				mustMarshalMsg(
+					t,
+					raftpb.Message{},
+				),
+			),
+			etcdserver.ErrRemoved,
+			"0",
+			http.StatusForbidden,
+		},
 		{
 			// good request
 			"POST",
@@ -1654,3 +1667,5 @@ func (c *fakeCluster) Get() etcdserver.Cluster {
 }
 
 func (c *fakeCluster) Remove(id uint64) { return }
+
+func (c *fakeCluster) IsRemoved(id uint64) bool { return false }

+ 6 - 2
etcdserver/member.go

@@ -68,8 +68,8 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
 	return m
 }
 
-func (m Member) storeKey() string {
-	return path.Join(storeMembersPrefix, idAsHex(m.ID))
+func memberStoreKey(id uint64) string {
+	return path.Join(storeMembersPrefix, idAsHex(id))
 }
 
 func parseMemberID(key string) uint64 {
@@ -79,3 +79,7 @@ func parseMemberID(key string) uint64 {
 	}
 	return id
 }
+
+func removedMemberStoreKey(id uint64) string {
+	return path.Join(storeRemovedMembersPrefix, idAsHex(id))
+}

+ 12 - 4
etcdserver/server.go

@@ -56,11 +56,13 @@ const (
 var (
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
 	ErrStopped       = errors.New("etcdserver: server stopped")
+	ErrRemoved       = errors.New("etcdserver: server removed")
 	ErrIDRemoved     = errors.New("etcdserver: ID removed")
 	ErrIDExists      = errors.New("etcdserver: ID exists")
 	ErrIDNotFound    = errors.New("etcdserver: ID not found")
 
-	storeMembersPrefix = path.Join(StoreAdminPrefix, "members")
+	storeMembersPrefix        = path.Join(StoreAdminPrefix, "members")
+	storeRemovedMembersPrefix = path.Join(StoreAdminPrefix, "removed_members")
 )
 
 func init() {
@@ -265,6 +267,9 @@ func (s *EtcdServer) start() {
 }
 
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
+	if s.ClusterStore.IsRemoved(m.From) {
+		return ErrRemoved
+	}
 	return s.node.Step(ctx, m)
 }
 
@@ -491,7 +496,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
 	req := pb.Request{
 		ID:     GenID(),
 		Method: "PUT",
-		Path:   Member{ID: s.id}.storeKey() + attributesSuffix,
+		Path:   memberStoreKey(s.id) + attributesSuffix,
 		Val:    string(b),
 	}
 
@@ -585,7 +590,7 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
 }
 
 func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error {
-	if err := checkConfChange(cc, nodes); err != nil {
+	if err := s.checkConfChange(cc, nodes); err != nil {
 		cc.NodeID = raft.None
 		s.node.ApplyConfChange(cc)
 		return err
@@ -607,7 +612,10 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error
 	return nil
 }
 
-func checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
+func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
+	if s.ClusterStore.IsRemoved(cc.NodeID) {
+		return ErrIDRemoved
+	}
 	switch cc.Type {
 	case raftpb.ConfChangeAddNode:
 		if containsUint64(nodes, cc.NodeID) {

+ 31 - 3
etcdserver/server_test.go

@@ -20,6 +20,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"math/rand"
+	"path"
 	"reflect"
 	"sync"
 	"testing"
@@ -386,10 +387,25 @@ func TestApplyRequest(t *testing.T) {
 // TODO: test ErrIDRemoved
 func TestApplyConfChangeError(t *testing.T) {
 	nodes := []uint64{1, 2, 3}
+	removed := map[uint64]bool{4: true}
 	tests := []struct {
 		cc   raftpb.ConfChange
 		werr error
 	}{
+		{
+			raftpb.ConfChange{
+				Type:   raftpb.ConfChangeAddNode,
+				NodeID: 4,
+			},
+			ErrIDRemoved,
+		},
+		{
+			raftpb.ConfChange{
+				Type:   raftpb.ConfChangeRemoveNode,
+				NodeID: 4,
+			},
+			ErrIDRemoved,
+		},
 		{
 			raftpb.ConfChange{
 				Type:   raftpb.ConfChangeAddNode,
@@ -407,8 +423,10 @@ func TestApplyConfChangeError(t *testing.T) {
 	}
 	for i, tt := range tests {
 		n := &nodeRecorder{}
+		cs := &removedClusterStore{removed: removed}
 		srv := &EtcdServer{
-			node: n,
+			node:         n,
+			ClusterStore: cs,
 		}
 		err := srv.applyConfChange(tt.cc, nodes)
 		if err != tt.werr {
@@ -950,8 +968,8 @@ func TestPublish(t *testing.T) {
 		t.Errorf("method = %s, want PUT", r.Method)
 	}
 	wm := Member{ID: 1, Attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
-	if r.Path != wm.storeKey()+attributesSuffix {
-		t.Errorf("path = %s, want %s", r.Path, wm.storeKey()+attributesSuffix)
+	if w := path.Join(memberStoreKey(wm.ID), attributesSuffix); r.Path != w {
+		t.Errorf("path = %s, want %s", r.Path, w)
 	}
 	var gattr Attributes
 	if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
@@ -1312,6 +1330,16 @@ func (cs *clusterStoreRecorder) Get() Cluster {
 func (cs *clusterStoreRecorder) Remove(id uint64) {
 	cs.record(action{name: "Remove", params: []interface{}{id}})
 }
+func (cs *clusterStoreRecorder) IsRemoved(id uint64) bool { return false }
+
+type removedClusterStore struct {
+	removed map[uint64]bool
+}
+
+func (cs *removedClusterStore) Add(m Member)             {}
+func (cs *removedClusterStore) Get() Cluster             { return Cluster{} }
+func (cs *removedClusterStore) Remove(id uint64)         {}
+func (cs *removedClusterStore) IsRemoved(id uint64) bool { return cs.removed[id] }
 
 func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer {
 	peers := make([]raft.Peer, len(ids))