|
@@ -15,6 +15,7 @@
|
|
|
package rafthttp
|
|
package rafthttp
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
|
+ "sync"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
|
@@ -104,18 +105,17 @@ type peer struct {
|
|
|
pipeline *pipeline
|
|
pipeline *pipeline
|
|
|
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
|
|
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
|
|
|
msgAppV2Reader *streamReader
|
|
msgAppV2Reader *streamReader
|
|
|
|
|
+ msgAppReader *streamReader
|
|
|
|
|
|
|
|
- sendc chan raftpb.Message
|
|
|
|
|
- recvc chan raftpb.Message
|
|
|
|
|
- propc chan raftpb.Message
|
|
|
|
|
- newURLsC chan types.URLs
|
|
|
|
|
|
|
+ sendc chan raftpb.Message
|
|
|
|
|
+ recvc chan raftpb.Message
|
|
|
|
|
+ propc chan raftpb.Message
|
|
|
|
|
|
|
|
- // for testing
|
|
|
|
|
- pausec chan struct{}
|
|
|
|
|
- resumec chan struct{}
|
|
|
|
|
|
|
+ mu sync.Mutex
|
|
|
|
|
+ paused bool
|
|
|
|
|
|
|
|
- stopc chan struct{}
|
|
|
|
|
- done chan struct{}
|
|
|
|
|
|
|
+ cancel context.CancelFunc // cancel pending works in go routine created by peer.
|
|
|
|
|
+ stopc chan struct{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
|
|
func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
|
|
@@ -134,16 +134,11 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
|
|
|
sendc: make(chan raftpb.Message),
|
|
sendc: make(chan raftpb.Message),
|
|
|
recvc: make(chan raftpb.Message, recvBufSize),
|
|
recvc: make(chan raftpb.Message, recvBufSize),
|
|
|
propc: make(chan raftpb.Message, maxPendingProposals),
|
|
propc: make(chan raftpb.Message, maxPendingProposals),
|
|
|
- newURLsC: make(chan types.URLs),
|
|
|
|
|
- pausec: make(chan struct{}),
|
|
|
|
|
- resumec: make(chan struct{}),
|
|
|
|
|
stopc: make(chan struct{}),
|
|
stopc: make(chan struct{}),
|
|
|
- done: make(chan struct{}),
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Use go-routine for process of MsgProp because it is
|
|
|
|
|
- // blocking when there is no leader.
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
+ p.cancel = cancel
|
|
|
go func() {
|
|
go func() {
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
@@ -151,66 +146,43 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
|
|
|
if err := r.Process(ctx, mm); err != nil {
|
|
if err := r.Process(ctx, mm); err != nil {
|
|
|
plog.Warningf("failed to process raft message (%v)", err)
|
|
plog.Warningf("failed to process raft message (%v)", err)
|
|
|
}
|
|
}
|
|
|
- case <-p.stopc:
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }()
|
|
|
|
|
-
|
|
|
|
|
- p.msgAppV2Reader = startStreamReader(transport, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
|
|
|
|
|
- reader := startStreamReader(transport, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
|
|
|
|
|
- go func() {
|
|
|
|
|
- var paused bool
|
|
|
|
|
- for {
|
|
|
|
|
- select {
|
|
|
|
|
- case m := <-p.sendc:
|
|
|
|
|
- if paused {
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
- writec, name := p.pick(m)
|
|
|
|
|
- select {
|
|
|
|
|
- case writec <- m:
|
|
|
|
|
- default:
|
|
|
|
|
- p.r.ReportUnreachable(m.To)
|
|
|
|
|
- if isMsgSnap(m) {
|
|
|
|
|
- p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
|
|
|
|
- }
|
|
|
|
|
- if status.isActive() {
|
|
|
|
|
- plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
|
|
|
|
|
- }
|
|
|
|
|
- plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
|
|
|
|
|
- }
|
|
|
|
|
case mm := <-p.recvc:
|
|
case mm := <-p.recvc:
|
|
|
- if err := r.Process(context.TODO(), mm); err != nil {
|
|
|
|
|
|
|
+ if err := r.Process(ctx, mm); err != nil {
|
|
|
plog.Warningf("failed to process raft message (%v)", err)
|
|
plog.Warningf("failed to process raft message (%v)", err)
|
|
|
}
|
|
}
|
|
|
- case urls := <-p.newURLsC:
|
|
|
|
|
- picker.update(urls)
|
|
|
|
|
- case <-p.pausec:
|
|
|
|
|
- paused = true
|
|
|
|
|
- case <-p.resumec:
|
|
|
|
|
- paused = false
|
|
|
|
|
case <-p.stopc:
|
|
case <-p.stopc:
|
|
|
- cancel()
|
|
|
|
|
- p.msgAppV2Writer.stop()
|
|
|
|
|
- p.writer.stop()
|
|
|
|
|
- p.pipeline.stop()
|
|
|
|
|
- p.snapSender.stop()
|
|
|
|
|
- p.msgAppV2Reader.stop()
|
|
|
|
|
- reader.stop()
|
|
|
|
|
- close(p.done)
|
|
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
|
|
+ p.msgAppV2Reader = startStreamReader(transport, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
|
|
|
|
|
+ p.msgAppReader = startStreamReader(transport, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
|
|
|
|
|
+
|
|
|
return p
|
|
return p
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *peer) send(m raftpb.Message) {
|
|
func (p *peer) send(m raftpb.Message) {
|
|
|
|
|
+ p.mu.Lock()
|
|
|
|
|
+ paused := p.paused
|
|
|
|
|
+ p.mu.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ if paused {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ writec, name := p.pick(m)
|
|
|
select {
|
|
select {
|
|
|
- case p.sendc <- m:
|
|
|
|
|
- case <-p.done:
|
|
|
|
|
|
|
+ case writec <- m:
|
|
|
|
|
+ default:
|
|
|
|
|
+ p.r.ReportUnreachable(m.To)
|
|
|
|
|
+ if isMsgSnap(m) {
|
|
|
|
|
+ p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
|
|
|
|
+ }
|
|
|
|
|
+ if p.status.isActive() {
|
|
|
|
|
+ plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
|
|
|
|
|
+ }
|
|
|
|
|
+ plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -219,10 +191,7 @@ func (p *peer) sendSnap(m snap.Message) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *peer) update(urls types.URLs) {
|
|
func (p *peer) update(urls types.URLs) {
|
|
|
- select {
|
|
|
|
|
- case p.newURLsC <- urls:
|
|
|
|
|
- case <-p.done:
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ p.picker.update(urls)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *peer) attachOutgoingConn(conn *outgoingConn) {
|
|
func (p *peer) attachOutgoingConn(conn *outgoingConn) {
|
|
@@ -245,23 +214,27 @@ func (p *peer) activeSince() time.Time { return p.status.activeSince }
|
|
|
// Pause pauses the peer. The peer will simply drops all incoming
|
|
// Pause pauses the peer. The peer will simply drops all incoming
|
|
|
// messages without returning an error.
|
|
// messages without returning an error.
|
|
|
func (p *peer) Pause() {
|
|
func (p *peer) Pause() {
|
|
|
- select {
|
|
|
|
|
- case p.pausec <- struct{}{}:
|
|
|
|
|
- case <-p.done:
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ p.mu.Lock()
|
|
|
|
|
+ defer p.mu.Unlock()
|
|
|
|
|
+ p.paused = true
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Resume resumes a paused peer.
|
|
// Resume resumes a paused peer.
|
|
|
func (p *peer) Resume() {
|
|
func (p *peer) Resume() {
|
|
|
- select {
|
|
|
|
|
- case p.resumec <- struct{}{}:
|
|
|
|
|
- case <-p.done:
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ p.mu.Lock()
|
|
|
|
|
+ defer p.mu.Unlock()
|
|
|
|
|
+ p.paused = false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *peer) stop() {
|
|
func (p *peer) stop() {
|
|
|
close(p.stopc)
|
|
close(p.stopc)
|
|
|
- <-p.done
|
|
|
|
|
|
|
+ p.cancel()
|
|
|
|
|
+ p.msgAppV2Writer.stop()
|
|
|
|
|
+ p.writer.stop()
|
|
|
|
|
+ p.pipeline.stop()
|
|
|
|
|
+ p.snapSender.stop()
|
|
|
|
|
+ p.msgAppV2Reader.stop()
|
|
|
|
|
+ p.msgAppReader.stop()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// pick picks a chan for sending the given message. The picked chan and the picked chan
|
|
// pick picks a chan for sending the given message. The picked chan and the picked chan
|