Browse Source

Merge pull request #7856 from fanminshi/fix_consistent_index_update

etcdserver: apply() sets consistIndex for any entry type
fanmin shi 8 years ago
parent
commit
de2e959b27
2 changed files with 48 additions and 0 deletions
  1. 5 0
      etcdserver/server.go
  2. 43 0
      etcdserver/server_test.go

+ 5 - 0
etcdserver/server.go

@@ -1272,9 +1272,14 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appl
 		case raftpb.EntryNormal:
 			s.applyEntryNormal(&e)
 		case raftpb.EntryConfChange:
+			// set the consistent index of current executing entry
+			if e.Index > s.consistIndex.ConsistentIndex() {
+				s.consistIndex.setConsistentIndex(e.Index)
+			}
 			var cc raftpb.ConfChange
 			pbutil.MustUnmarshal(&cc, e.Data)
 			removedSelf, err := s.applyConfChange(cc, confState)
+			s.setAppliedIndex(e.Index)
 			shouldStop = shouldStop || removedSelf
 			s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
 		default:

+ 43 - 0
etcdserver/server_test.go

@@ -585,6 +585,49 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
 	}
 }
 
+// TestApplyConfigChangeUpdatesConsistIndex ensures a config change also updates the consistIndex
+// where consistIndex equals to applied index.
+func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
+	cl := membership.NewCluster("")
+	cl.SetStore(store.New())
+	cl.AddMember(&membership.Member{ID: types.ID(1)})
+	r := newRaftNode(raftNodeConfig{
+		Node:      newNodeNop(),
+		transport: rafthttp.NewNopTransporter(),
+	})
+	srv := &EtcdServer{
+		id:      1,
+		r:       *r,
+		cluster: cl,
+		w:       wait.New(),
+	}
+
+	// create EntryConfChange entry
+	now := time.Now()
+	urls, err := types.NewURLs([]string{"http://whatever:123"})
+	if err != nil {
+		t.Fatal(err)
+	}
+	m := membership.NewMember("", urls, "", &now)
+	m.ID = types.ID(2)
+	b, err := json.Marshal(m)
+	if err != nil {
+		t.Fatal(err)
+	}
+	cc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: b}
+	ents := []raftpb.Entry{{
+		Index: 2,
+		Type:  raftpb.EntryConfChange,
+		Data:  pbutil.MustMarshal(cc),
+	}}
+
+	_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
+	consistIndex := srv.consistIndex.ConsistentIndex()
+	if consistIndex != appliedi {
+		t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
+	}
+}
+
 // TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop
 // if the local member is removed along with other conf updates.
 func TestApplyMultiConfChangeShouldStop(t *testing.T) {