|
|
@@ -154,6 +154,56 @@ func TestDoBadLocalAction(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// TestApplyRepeat tests that server handles repeat raft messages gracefully
|
|
|
+func TestApplyRepeat(t *testing.T) {
|
|
|
+ n := newNodeConfChangeCommitterRecorder()
|
|
|
+ n.readyc <- raft.Ready{
|
|
|
+ SoftState: &raft.SoftState{RaftState: raft.StateLeader},
|
|
|
+ }
|
|
|
+ cl := newTestCluster(nil)
|
|
|
+ st := store.New()
|
|
|
+ cl.SetStore(store.New())
|
|
|
+ cl.AddMember(&Member{ID: 1234})
|
|
|
+ s := &EtcdServer{
|
|
|
+ r: raftNode{
|
|
|
+ Node: n,
|
|
|
+ raftStorage: raft.NewMemoryStorage(),
|
|
|
+ storage: &storageRecorder{},
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
+ },
|
|
|
+ cfg: &ServerConfig{},
|
|
|
+ store: st,
|
|
|
+ cluster: cl,
|
|
|
+ reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
+ }
|
|
|
+ s.start()
|
|
|
+ req := &pb.Request{Method: "QGET", ID: uint64(1)}
|
|
|
+ ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}
|
|
|
+ n.readyc <- raft.Ready{CommittedEntries: ents}
|
|
|
+ // dup msg
|
|
|
+ n.readyc <- raft.Ready{CommittedEntries: ents}
|
|
|
+
|
|
|
+ // use a conf change to block until dup msgs are all processed
|
|
|
+ cc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
|
|
|
+ ents = []raftpb.Entry{{
|
|
|
+ Index: 2,
|
|
|
+ Type: raftpb.EntryConfChange,
|
|
|
+ Data: pbutil.MustMarshal(cc),
|
|
|
+ }}
|
|
|
+ n.readyc <- raft.Ready{CommittedEntries: ents}
|
|
|
+ act, err := n.Wait(1)
|
|
|
+ s.Stop()
|
|
|
+
|
|
|
+ // only want to confirm etcdserver won't panic; no data to check
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+ if len(act) == 0 {
|
|
|
+ t.Fatalf("expected len(act)=0, got %d", len(act))
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestApplyRequest(t *testing.T) {
|
|
|
tests := []struct {
|
|
|
req pb.Request
|