Browse Source

server: clear proposal wait in time

Yicheng Qin 11 years ago
parent
commit
16e9aa77e3
3 changed files with 12 additions and 2 deletions
  1. 1 0
      etcd/etcd.go
  2. 4 2
      etcd/v2_apply.go
  3. 7 0
      etcd/v2_raft.go

+ 1 - 0
etcd/etcd.go

@@ -298,6 +298,7 @@ func (s *Server) runParticipant() {
 	recv := s.t.recv
 	ticker := time.NewTicker(s.tickDuration)
 	v2SyncTicker := time.NewTicker(time.Millisecond * 500)
+	defer node.StopProposalWaiters()
 
 	var proposal chan v2Proposal
 	var addNodeC, removeNodeC chan raft.Config

+ 4 - 2
etcd/v2_apply.go

@@ -50,7 +50,8 @@ func (s *Server) v2apply(index int64, ent raft.Entry) {
 		}
 	}
 
-	if s.node.result[wait{index, ent.Term}] == nil {
+	w := wait{index, ent.Term}
+	if s.node.result[w] == nil {
 		return
 	}
 
@@ -59,5 +60,6 @@ func (s *Server) v2apply(index int64, ent raft.Entry) {
 	} else {
 		ret = e
 	}
-	s.node.result[wait{index, ent.Term}] <- ret
+	s.node.result[w] <- ret
+	delete(s.node.result, w)
 }

+ 7 - 0
etcd/v2_raft.go

@@ -45,3 +45,10 @@ func (r *v2Raft) Sync() {
 	}
 	r.Node.Propose(data)
 }
+
+func (r *v2Raft) StopProposalWaiters() {
+	for k, ch := range r.result {
+		ch <- fmt.Errorf("server is stopped or removed from participant")
+		delete(r.result, k)
+	}
+}