Browse Source

*: add pauseMember test

Xiang Li 11 years ago
parent
commit
151f043414

+ 23 - 0
etcdserver/sendhub.go

@@ -32,6 +32,16 @@ const (
 	raftPrefix = "/raft"
 )
 
+type SendHub interface {
+	rafthttp.SenderFinder
+	Send(m []raftpb.Message)
+	Add(m *Member)
+	Remove(id types.ID)
+	Update(m *Member)
+	Stop()
+	ShouldStopNotify() <-chan struct{}
+}
+
 type sendHub struct {
 	tr         http.RoundTripper
 	cl         ClusterInfo
@@ -129,3 +139,16 @@ func (h *sendHub) Update(m *Member) {
 	u.Path = path.Join(u.Path, raftPrefix)
 	h.senders[m.ID].Update(u.String())
 }
+
+// for testing
+func (h *sendHub) pause() {
+	for _, s := range h.senders {
+		s.Pause()
+	}
+}
+
+func (h *sendHub) resume() {
+	for _, s := range h.senders {
+		s.Resume()
+	}
+}

+ 11 - 10
etcdserver/server.go

@@ -90,16 +90,6 @@ type Response struct {
 	err     error
 }
 
-type SendHub interface {
-	rafthttp.SenderFinder
-	Send(m []raftpb.Message)
-	Add(m *Member)
-	Remove(id types.ID)
-	Update(m *Member)
-	Stop()
-	ShouldStopNotify() <-chan struct{}
-}
-
 type Storage interface {
 	// Save function saves ents and state to the underlying stable storage.
 	// Save MUST block until st and ents are on stable storage.
@@ -860,6 +850,17 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
 	log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
 }
 
+// for testing
+func (s *EtcdServer) PauseSending() {
+	hub := s.sendhub.(*sendHub)
+	hub.pause()
+}
+
+func (s *EtcdServer) ResumeSending() {
+	hub := s.sendhub.(*sendHub)
+	hub.resume()
+}
+
 // checkClientURLsEmptyFromPeers does its best to get the cluster from peers,
 // and if this succeeds, checks that the member of the given id exists in the
 // cluster, and its ClientURLs is empty.

+ 17 - 3
integration/cluster_test.go

@@ -34,6 +34,7 @@ import (
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
 	"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
+	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/rafthttp"
@@ -391,8 +392,9 @@ type member struct {
 	etcdserver.ServerConfig
 	PeerListeners, ClientListeners []net.Listener
 
-	s   *etcdserver.EtcdServer
-	hss []*httptest.Server
+	raftHandler *testutil.PauseableHandler
+	s           *etcdserver.EtcdServer
+	hss         []*httptest.Server
 }
 
 func mustNewMember(t *testing.T, name string) *member {
@@ -469,10 +471,12 @@ func (m *member) Launch() error {
 	m.s.SyncTicker = time.Tick(500 * time.Millisecond)
 	m.s.Start()
 
+	m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
+
 	for _, ln := range m.PeerListeners {
 		hs := &httptest.Server{
 			Listener: ln,
-			Config:   &http.Server{Handler: etcdhttp.NewPeerHandler(m.s)},
+			Config:   &http.Server{Handler: m.raftHandler},
 		}
 		hs.Start()
 		m.hss = append(m.hss, hs)
@@ -488,6 +492,16 @@ func (m *member) Launch() error {
 	return nil
 }
 
+func (m *member) Pause() {
+	m.raftHandler.Pause()
+	m.s.PauseSending()
+}
+
+func (m *member) Resume() {
+	m.raftHandler.Resume()
+	m.s.ResumeSending()
+}
+
 // Stop stops the member, but the data dir of the member is preserved.
 func (m *member) Stop(t *testing.T) {
 	m.s.Stop()

+ 16 - 0
integration/member_test.go

@@ -4,8 +4,24 @@ import (
 	"io/ioutil"
 	"os"
 	"testing"
+	"time"
 )
 
+func TestPauseMember(t *testing.T) {
+	defer afterTest(t)
+	c := NewCluster(t, 5)
+	c.Launch(t)
+	defer c.Terminate(t)
+
+	for i := 0; i < 5; i++ {
+		c.Members[i].Pause()
+		time.Sleep(20 * tickDuration)
+		c.Members[i].Resume()
+	}
+	c.waitLeader(t)
+	clusterMustProgress(t, c)
+}
+
 func TestRestartMember(t *testing.T) {
 	defer afterTest(t)
 	c := NewCluster(t, 3)

+ 61 - 0
pkg/testutil/pauseable_handler.go

@@ -0,0 +1,61 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package testutil
+
+import (
+	"net/http"
+	"sync"
+)
+
+type PauseableHandler struct {
+	Next   http.Handler
+	mu     sync.Mutex
+	paused bool
+}
+
+func (ph *PauseableHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	ph.mu.Lock()
+	paused := ph.paused
+	ph.mu.Unlock()
+	if !paused {
+		ph.Next.ServeHTTP(w, r)
+	} else {
+		hj, ok := w.(http.Hijacker)
+		if !ok {
+			panic("webserver doesn't support hijacking")
+			return
+		}
+		conn, _, err := hj.Hijack()
+		if err != nil {
+			panic(err.Error())
+			return
+		}
+		conn.Close()
+	}
+}
+
+func (ph *PauseableHandler) Pause() {
+	ph.mu.Lock()
+	defer ph.mu.Unlock()
+	ph.paused = true
+}
+
+func (ph *PauseableHandler) Resume() {
+	ph.mu.Lock()
+	defer ph.mu.Unlock()
+	ph.paused = false
+}

+ 29 - 2
rafthttp/sender.go

@@ -51,6 +51,13 @@ type Sender interface {
 	// Stop performs any necessary finalization and terminates the Sender
 	// elegantly.
 	Stop()
+
+	// Pause pauses the sender. The sender will simply drops all incoming
+	// messages without retruning an error.
+	Pause()
+
+	// Resume resumes a paused sender.
+	Resume()
 }
 
 func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
@@ -85,8 +92,9 @@ type sender struct {
 	strmSrvMu sync.Mutex
 	q         chan []byte
 
-	mu sync.RWMutex
-	wg sync.WaitGroup
+	paused bool
+	mu     sync.RWMutex
+	wg     sync.WaitGroup
 }
 
 func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) {
@@ -112,6 +120,13 @@ func (s *sender) Update(u string) {
 
 // TODO (xiangli): reasonable retry logic
 func (s *sender) Send(m raftpb.Message) error {
+	s.mu.RLock()
+	pause := s.paused
+	s.mu.RUnlock()
+	if pause {
+		return nil
+	}
+
 	s.maybeStopStream(m.Term)
 	if shouldInitStream(m) && !s.hasStreamClient() {
 		s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
@@ -152,6 +167,18 @@ func (s *sender) Stop() {
 	}
 }
 
+func (s *sender) Pause() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.paused = true
+}
+
+func (s *sender) Resume() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.paused = false
+}
+
 func (s *sender) maybeStopStream(term uint64) {
 	if s.strmCln != nil && term > s.strmCln.term {
 		s.strmCln.stop()