Browse Source

Merge pull request #7574 from fanminshi/fix_mem_leak

raft: use rs.req.Entries[0].Data as the key for deletion in advance()
fanmin shi 8 years ago
parent
commit
631f790689
2 changed files with 50 additions and 1 deletions
  1. 49 0
      raft/raft_test.go
  2. 1 1
      raft/read_only.go

+ 49 - 0
raft/raft_test.go

@@ -1246,6 +1246,55 @@ func TestHandleHeartbeatResp(t *testing.T) {
 	}
 }
 
+// TestRaftFreesReadOnlyMem ensures raft will free read request from
+// readOnly readIndexQueue and pendingReadIndex map.
+// related issue: https://github.com/coreos/etcd/issues/7571
+func TestRaftFreesReadOnlyMem(t *testing.T) {
+	sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
+	sm.becomeCandidate()
+	sm.becomeLeader()
+	sm.raftLog.commitTo(sm.raftLog.lastIndex())
+
+	ctx := []byte("ctx")
+
+	// leader starts linearizable read request.
+	// more info: raft dissertation 6.4, step 2.
+	sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
+	msgs := sm.readMessages()
+	if len(msgs) != 1 {
+		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
+	}
+	if msgs[0].Type != pb.MsgHeartbeat {
+		t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type)
+	}
+	if !bytes.Equal(msgs[0].Context, ctx) {
+		t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx)
+	}
+	if len(sm.readOnly.readIndexQueue) != 1 {
+		t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue))
+	}
+	if len(sm.readOnly.pendingReadIndex) != 1 {
+		t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex))
+	}
+	if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok {
+		t.Fatalf("can't find context %v in pendingReadIndex ", ctx)
+	}
+
+	// heartbeat responses from majority of followers (1 in this case)
+	// acknowledge the authority of the leader.
+	// more info: raft dissertation 6.4, step 3.
+	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx})
+	if len(sm.readOnly.readIndexQueue) != 0 {
+		t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue))
+	}
+	if len(sm.readOnly.pendingReadIndex) != 0 {
+		t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex))
+	}
+	if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok {
+		t.Fatalf("found context %v in pendingReadIndex, want none", ctx)
+	}
+}
+
 // TestMsgAppRespWaitReset verifies the resume behavior of a leader
 // MsgAppResp.
 func TestMsgAppRespWaitReset(t *testing.T) {

+ 1 - 1
raft/read_only.go

@@ -100,7 +100,7 @@ func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
 	if found {
 		ro.readIndexQueue = ro.readIndexQueue[i:]
 		for _, rs := range rss {
-			delete(ro.pendingReadIndex, string(rs.req.Context))
+			delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
 		}
 		return rss
 	}