浏览代码

rafthttp: add requester to transport if peer does not exist

cluster integration now supports adding members with stopped nodes, too

Fixes #3699
Anthony Romano 10 年之前
父节点
当前提交
db0b505de5
共有 9 个文件被更改,包括 175 次插入61 次删除
  1. 43 16
      integration/cluster.go
  2. 47 0
      integration/cluster_test.go
  3. 12 2
      rafthttp/http.go
  4. 11 5
      rafthttp/http_test.go
  5. 16 5
      rafthttp/peer.go
  6. 21 13
      rafthttp/stream.go
  7. 22 17
      rafthttp/stream_test.go
  8. 2 2
      rafthttp/transport.go
  9. 1 1
      rafthttp/transport_test.go

+ 43 - 16
integration/cluster.go

@@ -153,9 +153,15 @@ func (c *cluster) URL(i int) string {
 	return c.Members[i].ClientURLs[0].String()
 }
 
+// URLs returns a list of all active client URLs in the cluster
 func (c *cluster) URLs() []string {
 	urls := make([]string, 0)
 	for _, m := range c.Members {
+		select {
+		case <-m.s.StopNotify():
+			continue
+		default:
+		}
 		for _, u := range m.ClientURLs {
 			urls = append(urls, u.String())
 		}
@@ -163,9 +169,10 @@ func (c *cluster) URLs() []string {
 	return urls
 }
 
+// HTTPMembers returns a list of all active members as client.Members
 func (c *cluster) HTTPMembers() []client.Member {
-	ms := make([]client.Member, len(c.Members))
-	for i, m := range c.Members {
+	ms := []client.Member{}
+	for _, m := range c.Members {
 		pScheme, cScheme := "http", "http"
 		if m.PeerTLSInfo != nil {
 			pScheme = "https"
@@ -173,13 +180,14 @@ func (c *cluster) HTTPMembers() []client.Member {
 		if m.ClientTLSInfo != nil {
 			cScheme = "https"
 		}
-		ms[i].Name = m.Name
+		cm := client.Member{Name: m.Name}
 		for _, ln := range m.PeerListeners {
-			ms[i].PeerURLs = append(ms[i].PeerURLs, pScheme+"://"+ln.Addr().String())
+			cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
 		}
 		for _, ln := range m.ClientListeners {
-			ms[i].ClientURLs = append(ms[i].ClientURLs, cScheme+"://"+ln.Addr().String())
+			cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
 		}
+		ms = append(ms, cm)
 	}
 	return ms
 }
@@ -206,18 +214,17 @@ func (c *cluster) addMember(t *testing.T) {
 	}
 
 	// send add request to the cluster
-	cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
-	ma := client.NewMembersAPI(cc)
-	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
-	peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
-	if _, err := ma.Add(ctx, peerURL); err != nil {
-		t.Fatalf("add member on %s error: %v", c.URL(0), err)
+	var err error
+	for i := 0; i < len(c.Members); i++ {
+		clientURL := c.URL(i)
+		peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
+		if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
+			break
+		}
+	}
+	if err != nil {
+		t.Fatalf("add member failed on all members error: %v", err)
 	}
-	cancel()
-
-	// wait for the add node entry applied in the cluster
-	members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
-	c.waitMembersMatch(t, members)
 
 	m.InitialPeerURLsMap = types.URLsMap{}
 	for _, mm := range c.Members {
@@ -233,6 +240,21 @@ func (c *cluster) addMember(t *testing.T) {
 	c.waitMembersMatch(t, c.HTTPMembers())
 }
 
+func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
+	cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
+	ma := client.NewMembersAPI(cc)
+	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+	if _, err := ma.Add(ctx, peerURL); err != nil {
+		return err
+	}
+	cancel()
+
+	// wait for the add node entry applied in the cluster
+	members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
+	c.waitMembersMatch(t, members)
+	return nil
+}
+
 func (c *cluster) AddMember(t *testing.T) {
 	c.addMember(t)
 }
@@ -299,6 +321,11 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
 	for lead == 0 || !possibleLead[lead] {
 		lead = 0
 		for _, m := range membs {
+			select {
+			case <-m.s.StopNotify():
+				continue
+			default:
+			}
 			if lead != 0 && lead != m.s.Lead() {
 				lead = 0
 				break

+ 47 - 0
integration/cluster_test.go

@@ -290,6 +290,53 @@ func TestIssue2904(t *testing.T) {
 	c.waitMembersMatch(t, c.HTTPMembers())
 }
 
+// TestIssue3699 tests minority failure during cluster configuration; it was
+// deadlocking.
+func TestIssue3699(t *testing.T) {
+	// start a cluster of 3 nodes a, b, c
+	defer testutil.AfterTest(t)
+	c := NewCluster(t, 3)
+	c.Launch(t)
+	defer c.Terminate(t)
+
+	// make node a unavailable
+	c.Members[0].Stop(t)
+	<-c.Members[0].s.StopNotify()
+
+	// add node d
+	c.AddMember(t)
+
+	// electing node d as leader makes node a unable to participate
+	leaderID := c.waitLeader(t, c.Members)
+	for leaderID != 3 {
+		c.Members[leaderID].Stop(t)
+		<-c.Members[leaderID].s.StopNotify()
+		c.Members[leaderID].Restart(t)
+		leaderID = c.waitLeader(t, c.Members)
+	}
+
+	// bring back node a
+	// node a will remain useless as long as d is the leader.
+	err := c.Members[0].Restart(t)
+	select {
+	case <-c.Members[0].s.StopNotify():
+		t.Fatalf("should not be stopped")
+	default:
+	}
+	// must waitLeader so goroutines don't leak on terminate
+	leaderID = c.waitLeader(t, c.Members)
+
+	// try to participate in cluster
+	cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
+	kapi := client.NewKeysAPI(cc)
+	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+	_, err = kapi.Set(ctx, "/foo", "bar", nil)
+	cancel()
+	if err != nil {
+		t.Fatalf("unexpected error on Set (%v)", err)
+	}
+}
+
 // clusterMustProgress ensures that cluster can make progress. It creates
 // a random key first, and check the new key could be got from all client urls
 // of the cluster.

+ 12 - 2
rafthttp/http.go

@@ -20,6 +20,7 @@ import (
 	"io/ioutil"
 	"net/http"
 	"path"
+	"strings"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	pioutil "github.com/coreos/etcd/pkg/ioutil"
@@ -198,15 +199,17 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 }
 
 type streamHandler struct {
+	tr         *Transport
 	peerGetter peerGetter
 	r          Raft
 	id         types.ID
 	cid        types.ID
 }
 
-func newStreamHandler(peerGetter peerGetter, r Raft, id, cid types.ID) http.Handler {
+func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
 	return &streamHandler{
-		peerGetter: peerGetter,
+		tr:         tr,
+		peerGetter: pg,
 		r:          r,
 		id:         id,
 		cid:        cid,
@@ -253,6 +256,13 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	p := h.peerGetter.Get(from)
+	if p == nil {
+		if urls := r.Header.Get("X-Server-Peers"); urls != "" {
+			h.tr.AddPeer(from, strings.Split(urls, ","))
+		}
+		p = h.peerGetter.Get(from)
+	}
+
 	if p == nil {
 		// This may happen in following cases:
 		// 1. user starts a remote peer that belongs to a different cluster

+ 11 - 5
rafthttp/http_test.go

@@ -21,6 +21,7 @@ import (
 	"io"
 	"net/http"
 	"net/http/httptest"
+	"net/url"
 	"strings"
 	"testing"
 	"time"
@@ -183,7 +184,8 @@ func TestServeRaftStreamPrefix(t *testing.T) {
 
 		peer := newFakePeer()
 		peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}}
-		h := newStreamHandler(peerGetter, &fakeRaft{}, types.ID(2), types.ID(1))
+		tr := &Transport{}
+		h := newStreamHandler(tr, peerGetter, &fakeRaft{}, types.ID(2), types.ID(1))
 
 		rw := httptest.NewRecorder()
 		go h.ServeHTTP(rw, req)
@@ -296,9 +298,10 @@ func TestServeRaftStreamPrefixBad(t *testing.T) {
 		req.Header.Set("X-Server-Version", version.Version)
 		req.Header.Set("X-Raft-To", tt.remote)
 		rw := httptest.NewRecorder()
+		tr := &Transport{}
 		peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}}
 		r := &fakeRaft{removedID: removedID}
-		h := newStreamHandler(peerGetter, r, types.ID(1), types.ID(1))
+		h := newStreamHandler(tr, peerGetter, r, types.ID(1), types.ID(1))
 		h.ServeHTTP(rw, req)
 
 		if rw.Code != tt.wcode {
@@ -343,19 +346,22 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
 type fakePeer struct {
 	msgs     []raftpb.Message
 	snapMsgs []snap.Message
-	urls     types.URLs
+	peerURLs types.URLs
 	connc    chan *outgoingConn
 }
 
 func newFakePeer() *fakePeer {
+	fakeURL, _ := url.Parse("http://localhost")
 	return &fakePeer{
-		connc: make(chan *outgoingConn, 1),
+		connc:    make(chan *outgoingConn, 1),
+		peerURLs: types.URLs{*fakeURL},
 	}
 }
 
 func (pr *fakePeer) send(m raftpb.Message)                 { pr.msgs = append(pr.msgs, m) }
 func (pr *fakePeer) sendSnap(m snap.Message)               { pr.snapMsgs = append(pr.snapMsgs, m) }
-func (pr *fakePeer) update(urls types.URLs)                { pr.urls = urls }
+func (pr *fakePeer) update(urls types.URLs)                { pr.peerURLs = urls }
 func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
 func (pr *fakePeer) activeSince() time.Time                { return time.Time{} }
 func (pr *fakePeer) stop()                                 {}
+func (pr *fakePeer) urls() types.URLs                      { return pr.peerURLs }

+ 16 - 5
rafthttp/peer.go

@@ -15,7 +15,6 @@
 package rafthttp
 
 import (
-	"net/http"
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@@ -65,6 +64,10 @@ type Peer interface {
 
 	// update updates the urls of remote peer.
 	update(urls types.URLs)
+
+	// urls  retrieves the urls of the remote peer
+	urls() types.URLs
+
 	// attachOutgoingConn attaches the outgoing connection to the peer for
 	// stream usage. After the call, the ownership of the outgoing
 	// connection hands over to the peer. The peer will close the connection
@@ -97,6 +100,8 @@ type peer struct {
 
 	status *peerStatus
 
+	picker *urlPicker
+
 	msgAppV2Writer *streamWriter
 	writer         *streamWriter
 	pipeline       *pipeline
@@ -116,14 +121,16 @@ type peer struct {
 	done  chan struct{}
 }
 
-func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
-	picker := newURLPicker(urls)
+func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
 	status := newPeerStatus(to)
+	picker := newURLPicker(urls)
+	pipelineRt := transport.pipelineRt
 	p := &peer{
 		id:             to,
 		r:              r,
 		v3demo:         v3demo,
 		status:         status,
+		picker:         picker,
 		msgAppV2Writer: startStreamWriter(to, status, fs, r),
 		writer:         startStreamWriter(to, status, fs, r),
 		pipeline:       newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc),
@@ -154,8 +161,8 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
 		}
 	}()
 
-	p.msgAppV2Reader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
-	reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
+	p.msgAppV2Reader = startStreamReader(p, transport.streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
+	reader := startStreamReader(p, transport.streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
 	go func() {
 		var paused bool
 		for {
@@ -222,6 +229,10 @@ func (p *peer) update(urls types.URLs) {
 	}
 }
 
+func (p *peer) urls() types.URLs {
+	return p.picker.urls
+}
+
 func (p *peer) attachOutgoingConn(conn *outgoingConn) {
 	var ok bool
 	switch conn.t {

+ 21 - 13
rafthttp/stream.go

@@ -226,6 +226,7 @@ func (cw *streamWriter) stop() {
 // streamReader is a long-running go-routine that dials to the remote stream
 // endpoint and reads messages from the response body returned.
 type streamReader struct {
+	localPeer     Peer
 	tr            http.RoundTripper
 	picker        *urlPicker
 	t             streamType
@@ -243,20 +244,21 @@ type streamReader struct {
 	done   chan struct{}
 }
 
-func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
+func startStreamReader(p Peer, tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
 	r := &streamReader{
-		tr:     tr,
-		picker: picker,
-		t:      t,
-		local:  local,
-		remote: remote,
-		cid:    cid,
-		status: status,
-		recvc:  recvc,
-		propc:  propc,
-		errorc: errorc,
-		stopc:  make(chan struct{}),
-		done:   make(chan struct{}),
+		localPeer: p,
+		tr:        tr,
+		picker:    picker,
+		t:         t,
+		local:     local,
+		remote:    remote,
+		cid:       cid,
+		status:    status,
+		recvc:     recvc,
+		propc:     propc,
+		errorc:    errorc,
+		stopc:     make(chan struct{}),
+		done:      make(chan struct{}),
 	}
 	go r.run()
 	return r
@@ -372,6 +374,12 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
 	req.Header.Set("X-Raft-To", cr.remote.String())
 
+	var peerURLs []string
+	for _, url := range cr.localPeer.urls() {
+		peerURLs = append(peerURLs, url.String())
+	}
+	req.Header.Set("X-Server-Peers", strings.Join(peerURLs, ","))
+
 	cr.mu.Lock()
 	select {
 	case <-cr.stopc:

+ 22 - 17
rafthttp/stream_test.go

@@ -116,11 +116,12 @@ func TestStreamReaderDialRequest(t *testing.T) {
 	for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
 		tr := &roundTripperRecorder{}
 		sr := &streamReader{
-			tr:     tr,
-			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
-			local:  types.ID(1),
-			remote: types.ID(2),
-			cid:    types.ID(1),
+			tr:        tr,
+			localPeer: newFakePeer(),
+			picker:    mustNewURLPicker(t, []string{"http://localhost:2380"}),
+			local:     types.ID(1),
+			remote:    types.ID(2),
+			cid:       types.ID(1),
 		}
 		sr.dial(tt)
 
@@ -166,12 +167,13 @@ func TestStreamReaderDialResult(t *testing.T) {
 			err:    tt.err,
 		}
 		sr := &streamReader{
-			tr:     tr,
-			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
-			local:  types.ID(1),
-			remote: types.ID(2),
-			cid:    types.ID(1),
-			errorc: make(chan error, 1),
+			tr:        tr,
+			localPeer: newFakePeer(),
+			picker:    mustNewURLPicker(t, []string{"http://localhost:2380"}),
+			local:     types.ID(1),
+			remote:    types.ID(2),
+			cid:       types.ID(1),
+			errorc:    make(chan error, 1),
 		}
 
 		_, err := sr.dial(streamTypeMessage)
@@ -194,11 +196,12 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) {
 			header: http.Header{},
 		}
 		sr := &streamReader{
-			tr:     tr,
-			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
-			local:  types.ID(1),
-			remote: types.ID(2),
-			cid:    types.ID(1),
+			tr:        tr,
+			localPeer: newFakePeer(),
+			picker:    mustNewURLPicker(t, []string{"http://localhost:2380"}),
+			local:     types.ID(1),
+			remote:    types.ID(2),
+			cid:       types.ID(1),
 		}
 
 		_, err := sr.dial(typ)
@@ -254,7 +257,9 @@ func TestStream(t *testing.T) {
 		h.sw = sw
 
 		picker := mustNewURLPicker(t, []string{srv.URL})
-		sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil)
+		tr := &http.Transport{}
+		peer := newFakePeer()
+		sr := startStreamReader(peer, tr, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil)
 		defer sr.stop()
 		// wait for stream to work
 		var writec chan<- raftpb.Message

+ 2 - 2
rafthttp/transport.go

@@ -140,7 +140,7 @@ func (t *Transport) Start() error {
 
 func (t *Transport) Handler() http.Handler {
 	pipelineHandler := newPipelineHandler(t.Raft, t.ClusterID)
-	streamHandler := newStreamHandler(t, t.Raft, t.ID, t.ClusterID)
+	streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
 	snapHandler := newSnapshotHandler(t.Raft, t.Snapshotter, t.ClusterID)
 	mux := http.NewServeMux()
 	mux.Handle(RaftPrefix, pipelineHandler)
@@ -226,7 +226,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
 	fs := t.LeaderStats.Follower(id.String())
-	t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo)
+	t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo)
 	addPeerToProber(t.prober, id.String(), us)
 }
 

+ 1 - 1
rafthttp/transport_test.go

@@ -120,7 +120,7 @@ func TestTransportUpdate(t *testing.T) {
 	u := "http://localhost:2380"
 	tr.UpdatePeer(types.ID(1), []string{u})
 	wurls := types.URLs(testutil.MustNewURLs(t, []string{"http://localhost:2380"}))
-	if !reflect.DeepEqual(peer.urls, wurls) {
+	if !reflect.DeepEqual(peer.peerURLs, wurls) {
 		t.Errorf("urls = %+v, want %+v", peer.urls, wurls)
 	}
 }