Browse Source

rafthttp: add run loop for peer

Yicheng Qin 11 years ago
parent
commit
55cd03ff4b
3 changed files with 112 additions and 64 deletions
  1. 1 2
      rafthttp/http.go
  2. 105 58
      rafthttp/peer.go
  3. 6 4
      rafthttp/streamer.go

+ 1 - 2
rafthttp/http.go

@@ -153,7 +153,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	sw := newStreamWriter(from, term)
+	sw := newStreamWriter(w.(WriteFlusher), from, term)
 	err = p.attachStream(sw)
 	if err != nil {
 		log.Printf("rafthttp: %v", err)
@@ -163,7 +163,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 
 	w.WriteHeader(http.StatusOK)
 	w.(http.Flusher).Flush()
-	go sw.handle(w.(WriteFlusher))
 	<-sw.stopNotify()
 }
 

+ 105 - 58
rafthttp/peer.go

@@ -15,7 +15,8 @@
 package rafthttp
 
 import (
-	"errors"
+	"fmt"
+	"log"
 	"net/http"
 	"sync"
 	"time"
@@ -51,12 +52,17 @@ type peer struct {
 	pipeline *pipeline
 	stream   *stream
 
-	paused  bool
-	stopped bool
+	sendc   chan raftpb.Message
+	updatec chan string
+	attachc chan *streamWriter
+	pausec  chan struct{}
+	resumec chan struct{}
+	stopc   chan struct{}
+	done    chan struct{}
 }
 
 func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
-	return &peer{
+	p := &peer{
 		id:          id,
 		cid:         cid,
 		tr:          tr,
@@ -67,33 +73,111 @@ func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft,
 		stream:      &stream{},
 		batcher:     NewBatcher(100, appRespBatchMs*time.Millisecond),
 		propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
+
+		sendc:   make(chan raftpb.Message),
+		updatec: make(chan string),
+		attachc: make(chan *streamWriter),
+		pausec:  make(chan struct{}),
+		resumec: make(chan struct{}),
+		stopc:   make(chan struct{}),
+		done:    make(chan struct{}),
 	}
+	go p.run()
+	return p
 }
 
-func (p *peer) Update(u string) {
-	p.Lock()
-	defer p.Unlock()
-	if p.stopped {
-		// TODO: not panic here?
-		panic("peer: update a stopped peer")
+func (p *peer) run() {
+	var paused bool
+	// non-blocking main loop
+	for {
+		select {
+		case m := <-p.sendc:
+			if paused {
+				continue
+			}
+			p.send(m)
+		case u := <-p.updatec:
+			p.u = u
+			p.pipeline.update(u)
+		case sw := <-p.attachc:
+			sw.fs = p.fs
+			if err := p.stream.attach(sw); err != nil {
+				sw.stop()
+				continue
+			}
+			go sw.handle()
+		case <-p.pausec:
+			paused = true
+		case <-p.resumec:
+			paused = false
+		case <-p.stopc:
+			p.pipeline.stop()
+			p.stream.stop()
+			close(p.done)
+			return
+		}
 	}
-	p.u = u
-	p.pipeline.update(u)
 }
 
-// Send sends the data to the remote node. It is always non-blocking.
-// It may be fail to send data if it returns nil error.
-// TODO (xiangli): reasonable retry logic
-func (p *peer) Send(m raftpb.Message) error {
-	p.Lock()
-	defer p.Unlock()
-	if p.stopped {
-		return errors.New("peer: stopped")
+func (p *peer) Send(m raftpb.Message) {
+	select {
+	case p.sendc <- m:
+	case <-p.done:
+		log.Panicf("peer: unexpected stopped")
 	}
-	if p.paused {
+}
+
+func (p *peer) Update(u string) {
+	select {
+	case p.updatec <- u:
+	case <-p.done:
+		log.Panicf("peer: unexpected stopped")
+	}
+}
+
+// attachStream attaches a streamWriter to the peer.
+// If attach succeeds, peer will take charge of the given streamWriter.
+func (p *peer) attachStream(sw *streamWriter) error {
+	select {
+	case p.attachc <- sw:
 		return nil
+	case <-p.done:
+		return fmt.Errorf("peer: stopped")
+	}
+}
+
+// Pause pauses the peer. The peer will simply drops all incoming
+// messages without retruning an error.
+func (p *peer) Pause() {
+	select {
+	case p.pausec <- struct{}{}:
+	case <-p.done:
 	}
+}
 
+// Resume resumes a paused peer.
+func (p *peer) Resume() {
+	select {
+	case p.resumec <- struct{}{}:
+	case <-p.done:
+	}
+}
+
+// Stop performs any necessary finalization and terminates the peer
+// elegantly.
+func (p *peer) Stop() {
+	select {
+	case p.stopc <- struct{}{}:
+	case <-p.done:
+		return
+	}
+	<-p.done
+}
+
+// send sends the data to the remote node. It is always non-blocking.
+// It may be fail to send data if it returns nil error.
+// TODO (xiangli): reasonable retry logic
+func (p *peer) send(m raftpb.Message) error {
 	// move all the stream related stuff into stream
 	p.stream.invalidate(m.Term)
 	if shouldInitStream(m) && !p.stream.isOpen() {
@@ -132,41 +216,4 @@ func (p *peer) Send(m raftpb.Message) error {
 	return err
 }
 
-// Stop performs any necessary finalization and terminates the peer
-// elegantly.
-func (p *peer) Stop() {
-	p.Lock()
-	defer p.Unlock()
-	p.pipeline.stop()
-	p.stream.stop()
-	p.stopped = true
-}
-
-// attachStream attaches a streamSever to the peer.
-func (p *peer) attachStream(sw *streamWriter) error {
-	p.Lock()
-	defer p.Unlock()
-	if p.stopped {
-		return errors.New("peer: stopped")
-	}
-
-	sw.fs = p.fs
-	return p.stream.attach(sw)
-}
-
-// Pause pauses the peer. The peer will simply drops all incoming
-// messages without retruning an error.
-func (p *peer) Pause() {
-	p.Lock()
-	defer p.Unlock()
-	p.paused = true
-}
-
-// Resume resumes a paused peer.
-func (p *peer) Resume() {
-	p.Lock()
-	defer p.Unlock()
-	p.paused = false
-}
-
 func isProposal(m raftpb.Message) bool { return m.Type == raftpb.MsgProp }

+ 6 - 4
rafthttp/streamer.go

@@ -150,6 +150,7 @@ type WriteFlusher interface {
 
 // TODO: replace fs with stream stats
 type streamWriter struct {
+	w    WriteFlusher
 	to   types.ID
 	term uint64
 	fs   *stats.FollowerStats
@@ -159,8 +160,9 @@ type streamWriter struct {
 
 // newStreamWriter starts and returns a new unstarted stream writer.
 // The caller should call stop when finished, to shut it down.
-func newStreamWriter(to types.ID, term uint64) *streamWriter {
+func newStreamWriter(w WriteFlusher, to types.ID, term uint64) *streamWriter {
 	s := &streamWriter{
+		w:    w,
 		to:   to,
 		term: term,
 		q:    make(chan []raftpb.Entry, streamBufSize),
@@ -184,13 +186,13 @@ func (s *streamWriter) send(ents []raftpb.Entry) error {
 	}
 }
 
-func (s *streamWriter) handle(w WriteFlusher) {
+func (s *streamWriter) handle() {
 	defer func() {
 		close(s.done)
 		log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
 	}()
 
-	ew := newEntryWriter(w, s.to)
+	ew := newEntryWriter(s.w, s.to)
 	for ents := range s.q {
 		// Considering Commit in MsgApp is not recovered when received,
 		// zero-entry appendEntry messages have no use to raft state machine.
@@ -203,7 +205,7 @@ func (s *streamWriter) handle(w WriteFlusher) {
 			log.Printf("rafthttp: encountered error writing to server log stream: %v", err)
 			return
 		}
-		w.Flush()
+		s.w.Flush()
 		s.fs.Succ(time.Since(start))
 	}
 }