Browse Source

rafthttp: streamserver -> streamwriter

Xiang Li 11 years ago
parent
commit
fe53ffd74d
3 changed files with 30 additions and 31 deletions
  1. 3 3
      rafthttp/http.go
  2. 3 3
      rafthttp/peer.go
  3. 24 25
      rafthttp/streamer.go

+ 3 - 3
rafthttp/http.go

@@ -159,14 +159,14 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	w.(http.Flusher).Flush()
 	w.(http.Flusher).Flush()
 
 
-	stream := newStreamServer(w.(WriteFlusher), from, term)
-	err = p.attachStream(stream)
+	sw := newStreamWriter(w.(WriteFlusher), from, term)
+	err = p.attachStream(sw)
 	if err != nil {
 	if err != nil {
 		log.Printf("rafthttp: %v", err)
 		log.Printf("rafthttp: %v", err)
 		http.Error(w, err.Error(), http.StatusBadRequest)
 		http.Error(w, err.Error(), http.StatusBadRequest)
 		return
 		return
 	}
 	}
-	<-stream.stopNotify()
+	<-sw.stopNotify()
 }
 }
 
 
 type writerToResponse interface {
 type writerToResponse interface {

+ 3 - 3
rafthttp/peer.go

@@ -246,9 +246,9 @@ func (p *peer) post(data []byte) error {
 }
 }
 
 
 // attachStream attaches a streamSever to the peer.
 // attachStream attaches a streamSever to the peer.
-func (p *peer) attachStream(server *streamServer) error {
-	server.fs = p.fs
-	return p.stream.attach(server)
+func (p *peer) attachStream(sw *streamWriter) error {
+	sw.fs = p.fs
+	return p.stream.attach(sw)
 }
 }
 
 
 // Pause pauses the peer. The peer will simply drops all incoming
 // Pause pauses the peer. The peer will simply drops all incoming

+ 24 - 25
rafthttp/streamer.go

@@ -44,7 +44,7 @@ type stream struct {
 	// the server might be attached asynchronously with the owner of the stream
 	// the server might be attached asynchronously with the owner of the stream
 	// use a mutex to protect it
 	// use a mutex to protect it
 	sync.Mutex
 	sync.Mutex
-	server *streamServer
+	w *streamWriter
 
 
 	client *streamClient
 	client *streamClient
 }
 }
@@ -63,38 +63,38 @@ func (s *stream) open(from, to, cid types.ID, term uint64, tr http.RoundTripper,
 	return nil
 	return nil
 }
 }
 
 
-func (s *stream) attach(server *streamServer) error {
+func (s *stream) attach(sw *streamWriter) error {
 	s.Lock()
 	s.Lock()
 	defer s.Unlock()
 	defer s.Unlock()
-	if s.server != nil {
+	if s.w != nil {
 		// ignore lower-term streaming request
 		// ignore lower-term streaming request
-		if server.term < s.server.term {
-			return fmt.Errorf("cannot attach out of data stream server [%d / %d]", server.term, s.server.term)
+		if sw.term < s.w.term {
+			return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term)
 		}
 		}
-		s.server.stop()
+		s.w.stop()
 	}
 	}
-	s.server = server
+	s.w = sw
 	return nil
 	return nil
 }
 }
 
 
 func (s *stream) write(m raftpb.Message) bool {
 func (s *stream) write(m raftpb.Message) bool {
 	s.Lock()
 	s.Lock()
 	defer s.Unlock()
 	defer s.Unlock()
-	if s.server == nil {
+	if s.w == nil {
 		return false
 		return false
 	}
 	}
-	if m.Term != s.server.term {
-		if m.Term > s.server.term {
+	if m.Term != s.w.term {
+		if m.Term > s.w.term {
 			panic("expected server to be invalidated when there is a higher term message")
 			panic("expected server to be invalidated when there is a higher term message")
 		}
 		}
 		return false
 		return false
 	}
 	}
 	// todo: early unlock?
 	// todo: early unlock?
-	if err := s.server.send(m.Entries); err != nil {
+	if err := s.w.send(m.Entries); err != nil {
 		log.Printf("stream: error sending message: %v", err)
 		log.Printf("stream: error sending message: %v", err)
 		log.Printf("stream: stopping the stream server...")
 		log.Printf("stream: stopping the stream server...")
-		s.server.stop()
-		s.server = nil
+		s.w.stop()
+		s.w = nil
 		return false
 		return false
 	}
 	}
 	return true
 	return true
@@ -106,10 +106,10 @@ func (s *stream) invalidate(term uint64) {
 	s.Lock()
 	s.Lock()
 	defer s.Unlock()
 	defer s.Unlock()
 
 
-	if s.server != nil {
-		if s.server.term < term {
-			s.server.stop()
-			s.server = nil
+	if s.w != nil {
+		if s.w.term < term {
+			s.w.stop()
+			s.w = nil
 		}
 		}
 	}
 	}
 	if s.client != nil {
 	if s.client != nil {
@@ -136,9 +136,8 @@ type WriteFlusher interface {
 	http.Flusher
 	http.Flusher
 }
 }
 
 
-// TODO: rename it to streamWriter.
 // TODO: replace fs with stream stats
 // TODO: replace fs with stream stats
-type streamServer struct {
+type streamWriter struct {
 	to   types.ID
 	to   types.ID
 	term uint64
 	term uint64
 	fs   *stats.FollowerStats
 	fs   *stats.FollowerStats
@@ -148,8 +147,8 @@ type streamServer struct {
 
 
 // newStreamServer starts and returns a new started stream server.
 // newStreamServer starts and returns a new started stream server.
 // The caller should call stop when finished, to shut it down.
 // The caller should call stop when finished, to shut it down.
-func newStreamServer(w WriteFlusher, to types.ID, term uint64) *streamServer {
-	s := &streamServer{
+func newStreamWriter(w WriteFlusher, to types.ID, term uint64) *streamWriter {
+	s := &streamWriter{
 		to:   to,
 		to:   to,
 		term: term,
 		term: term,
 		q:    make(chan []raftpb.Entry, streamBufSize),
 		q:    make(chan []raftpb.Entry, streamBufSize),
@@ -159,7 +158,7 @@ func newStreamServer(w WriteFlusher, to types.ID, term uint64) *streamServer {
 	return s
 	return s
 }
 }
 
 
-func (s *streamServer) send(ents []raftpb.Entry) error {
+func (s *streamWriter) send(ents []raftpb.Entry) error {
 	select {
 	select {
 	case <-s.done:
 	case <-s.done:
 		return fmt.Errorf("stopped")
 		return fmt.Errorf("stopped")
@@ -174,7 +173,7 @@ func (s *streamServer) send(ents []raftpb.Entry) error {
 	}
 	}
 }
 }
 
 
-func (s *streamServer) handle(w WriteFlusher) {
+func (s *streamWriter) handle(w WriteFlusher) {
 	defer func() {
 	defer func() {
 		close(s.done)
 		close(s.done)
 		log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
 		log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
@@ -192,12 +191,12 @@ func (s *streamServer) handle(w WriteFlusher) {
 	}
 	}
 }
 }
 
 
-func (s *streamServer) stop() {
+func (s *streamWriter) stop() {
 	close(s.q)
 	close(s.q)
 	<-s.done
 	<-s.done
 }
 }
 
 
-func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
+func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
 
 
 // TODO: rename it to streamReader.
 // TODO: rename it to streamReader.
 // TODO: move the raft interface out of the reader.
 // TODO: move the raft interface out of the reader.