Browse Source

etcd: refactor remove

Xiang Li 11 years ago
parent
commit
3ea913e76a
3 changed files with 73 additions and 70 deletions
  1. 44 24
      etcd/etcd.go
  2. 28 45
      etcd/etcd_test.go
  3. 1 1
      etcd/v2_http_endpoint_test.go

+ 44 - 24
etcd/etcd.go

@@ -41,6 +41,11 @@ const (
 	stop
 )
 
+var (
+	removeTmpErr  = fmt.Errorf("remove: try again")
+	serverStopErr = fmt.Errorf("server is stopped")
+)
+
 type Server struct {
 	config *config.Config
 
@@ -147,6 +152,10 @@ func (s *Server) Run() {
 }
 
 func (s *Server) Stop() {
+	if s.mode == stop {
+		return
+	}
+	s.mode = stop
 	close(s.stop)
 	s.t.stop()
 }
@@ -194,16 +203,17 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error {
 
 	_, err := s.Get(p, false, false)
 	if err == nil {
-		return fmt.Errorf("existed node")
+		return nil
 	}
 	if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
 		return err
 	}
 	for {
-		if s.mode == stop {
+		select {
+		case s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
+		case <-s.stop:
 			return fmt.Errorf("server is stopped")
 		}
-		s.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}
 		w, err := s.Watch(p, true, false, index+1)
 		if err != nil {
 			return err
@@ -215,34 +225,45 @@ func (s *Server) Add(id int64, raftPubAddr string, pubAddr string) error {
 			}
 			index = v.Index()
 		case <-time.After(4 * defaultHeartbeat * s.tickDuration):
+		case <-s.stop:
+			return fmt.Errorf("server is stopped")
 		}
 	}
 }
 
 func (s *Server) Remove(id int64) error {
 	p := path.Join(v2machineKVPrefix, fmt.Sprint(id))
-	index := s.Index()
 
-	if _, err := s.Get(p, false, false); err != nil {
-		return err
+	v, err := s.Get(p, false, false)
+	if err != nil {
+		return nil
 	}
-	for {
-		if s.mode == stop {
-			return fmt.Errorf("server is stopped")
-		}
-		s.removeNodeC <- raft.Config{NodeId: id}
-		w, err := s.Watch(p, true, false, index+1)
-		if err != nil {
-			return err
-		}
-		select {
-		case v := <-w.EventChan:
-			if v.Action == store.Delete {
-				return nil
-			}
-			index = v.Index()
-		case <-time.After(4 * defaultHeartbeat * s.tickDuration):
+
+	select {
+	case s.removeNodeC <- raft.Config{NodeId: id}:
+	case <-s.stop:
+		return serverStopErr
+	}
+
+	// TODO(xiangli): do not need to watch if the
+	// removal target is self
+	w, err := s.Watch(p, true, false, v.Index()+1)
+	if err != nil {
+		return removeTmpErr
+	}
+
+	select {
+	case v := <-w.EventChan:
+		if v.Action == store.Delete {
+			return nil
 		}
+		return removeTmpErr
+	case <-time.After(4 * defaultHeartbeat * s.tickDuration):
+		w.Remove()
+		return removeTmpErr
+	case <-s.stop:
+		w.Remove()
+		return serverStopErr
 	}
 }
 
@@ -291,15 +312,14 @@ func (s *Server) runParticipant() {
 			node.Sync()
 		case <-s.stop:
 			log.Printf("Node: %d stopped\n", s.id)
-			s.mode = stop
 			return
 		}
 		s.apply(node.Next())
 		s.send(node.Msgs())
 		if node.IsRemoved() {
 			// TODO: delete it after standby is implemented
-			s.mode = stop
 			log.Printf("Node: %d removed from participants\n", s.id)
+			s.Stop()
 			return
 		}
 	}

+ 28 - 45
etcd/etcd_test.go

@@ -135,78 +135,61 @@ func TestAdd(t *testing.T) {
 		for i := range hs {
 			hs[len(hs)-i-1].Close()
 		}
-		afterTest(t)
 	}
+	afterTest(t)
 }
 
 func TestRemove(t *testing.T) {
-	tests := []struct {
-		size  int
-		round int
-	}{
-		{3, 5},
-		{4, 5},
-		{5, 5},
-		{6, 5},
-	}
+	tests := []int{3, 4, 5, 6}
 
 	for _, tt := range tests {
-		es, hs := buildCluster(tt.size, false)
+		es, hs := buildCluster(tt, false)
 		waitCluster(t, es)
 
 		// we don't remove the machine from 2-node cluster because it is
 		// not 100 percent safe in our raft.
 		// TODO(yichengq): improve it later.
-		for i := 0; i < tt.size-2; i++ {
+		for i := 0; i < tt-2; i++ {
 			id := int64(i)
-			var index uint64
+			send := id
 			for {
-				lead := es[id].node.Leader()
-				if lead != -1 {
-					index = es[lead].Index()
-					if err := es[lead].Remove(id); err == nil {
-						break
-					}
+				send++
+				if send > int64(tt-1) {
+					send = id
 				}
-				runtime.Gosched()
-			}
 
-			// i-th machine cannot be promised to apply the removal command of
-			// its own due to our non-optimized raft.
-			// TODO(yichengq): it should work when
-			// https://github.com/etcd-team/etcd/pull/7 is merged.
-			for j := i + 1; j < tt.size; j++ {
-				w, err := es[j].Watch(v2machineKVPrefix, true, false, index+1)
-				if err != nil {
-					t.Errorf("#%d on %d: %v", i, j, err)
+				lead := es[send].node.Leader()
+				if lead == -1 {
+					time.Sleep(defaultElection * 5 * time.Millisecond)
+					continue
+				}
+
+				err := es[lead].Remove(id)
+				if err == nil {
 					break
 				}
-				v := <-w.EventChan
-				ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, i)
-				if v.Node.Key != ww {
-					t.Errorf("#%d on %d: path = %v, want %v", i, j, v.Node.Key, ww)
+				switch err {
+				case removeTmpErr:
+					time.Sleep(defaultElection * 5 * time.Millisecond)
+				case serverStopErr:
+					if lead == id {
+						break
+					}
+				default:
+					t.Fatal(err)
 				}
 			}
-
-			// may need to wait for msgDenial
-			// TODO(yichengq): no need to sleep here when previous issue is merged.
-			if es[i].mode == stop {
-				continue
-			}
-			time.Sleep(defaultElection * defaultTickDuration)
-			if g := es[i].mode; g != stop {
-				t.Errorf("#%d: mode = %d, want stop", i, g)
-			}
+			<-es[i].stop
 		}
 
-		for i := range hs {
+		for i := range es {
 			es[len(hs)-i-1].Stop()
 		}
 		for i := range hs {
 			hs[len(hs)-i-1].Close()
 		}
-		afterTest(t)
 	}
+	afterTest(t)
 }
 
 func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) {

+ 1 - 1
etcd/v2_http_endpoint_test.go

@@ -216,8 +216,8 @@ func TestPutAdminConfigEndPoint(t *testing.T) {
 		for j := range hs {
 			hs[len(hs)-j-1].Close()
 		}
-		afterTest(t)
 	}
+	afterTest(t)
 }
 
 func TestGetAdminMachineEndPoint(t *testing.T) {