Browse Source

rafthttp: rename streamClient -> streamReader

Xiang Li 11 years ago
parent
commit
95a661251d
1 changed files with 17 additions and 18 deletions
  1. 17 18
      rafthttp/streamer.go

+ 17 - 18
rafthttp/streamer.go

@@ -46,20 +46,20 @@ type stream struct {
 	sync.Mutex
 	sync.Mutex
 	w *streamWriter
 	w *streamWriter
 
 
-	client *streamClient
+	r *streamReader
 }
 }
 
 
 func (s *stream) open(from, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) error {
 func (s *stream) open(from, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) error {
-	if s.client != nil {
+	if s.r != nil {
 		panic("open: stream is open")
 		panic("open: stream is open")
 	}
 	}
 
 
-	c, err := newStreamClient(from, to, cid, term, tr, u, r)
+	c, err := newStreamReader(from, to, cid, term, tr, u, r)
 	if err != nil {
 	if err != nil {
 		log.Printf("stream: error opening stream: %v", err)
 		log.Printf("stream: error opening stream: %v", err)
 		return err
 		return err
 	}
 	}
-	s.client = c
+	s.r = c
 	return nil
 	return nil
 }
 }
 
 
@@ -112,10 +112,10 @@ func (s *stream) invalidate(term uint64) {
 			s.w = nil
 			s.w = nil
 		}
 		}
 	}
 	}
-	if s.client != nil {
-		if s.client.term < term {
-			s.client.stop()
-			s.client = nil
+	if s.r != nil {
+		if s.r.term < term {
+			s.r.stop()
+			s.r = nil
 		}
 		}
 	}
 	}
 }
 }
@@ -125,10 +125,10 @@ func (s *stream) stop() {
 }
 }
 
 
 func (s *stream) isOpen() bool {
 func (s *stream) isOpen() bool {
-	if s.client != nil && s.client.isStopped() {
-		s.client = nil
+	if s.r != nil && s.r.isStopped() {
+		s.r = nil
 	}
 	}
-	return s.client != nil
+	return s.r != nil
 }
 }
 
 
 type WriteFlusher interface {
 type WriteFlusher interface {
@@ -198,9 +198,8 @@ func (s *streamWriter) stop() {
 
 
 func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
 func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
 
 
-// TODO: rename it to streamReader.
 // TODO: move the raft interface out of the reader.
 // TODO: move the raft interface out of the reader.
-type streamClient struct {
+type streamReader struct {
 	id   types.ID
 	id   types.ID
 	to   types.ID
 	to   types.ID
 	term uint64
 	term uint64
@@ -212,8 +211,8 @@ type streamClient struct {
 
 
 // newStreamClient starts and returns a new started stream client.
 // newStreamClient starts and returns a new started stream client.
 // The caller should call stop when finished, to shut it down.
 // The caller should call stop when finished, to shut it down.
-func newStreamClient(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) (*streamClient, error) {
-	s := &streamClient{
+func newStreamReader(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) (*streamReader, error) {
+	s := &streamReader{
 		id:   id,
 		id:   id,
 		to:   to,
 		to:   to,
 		term: term,
 		term: term,
@@ -247,12 +246,12 @@ func newStreamClient(id, to, cid types.ID, term uint64, tr http.RoundTripper, u
 	return s, nil
 	return s, nil
 }
 }
 
 
-func (s *streamClient) stop() {
+func (s *streamReader) stop() {
 	s.closer.Close()
 	s.closer.Close()
 	<-s.done
 	<-s.done
 }
 }
 
 
-func (s *streamClient) isStopped() bool {
+func (s *streamReader) isStopped() bool {
 	select {
 	select {
 	case <-s.done:
 	case <-s.done:
 		return true
 		return true
@@ -261,7 +260,7 @@ func (s *streamClient) isStopped() bool {
 	}
 	}
 }
 }
 
 
-func (s *streamClient) handle(r io.Reader) {
+func (s *streamReader) handle(r io.Reader) {
 	defer func() {
 	defer func() {
 		close(s.done)
 		close(s.done)
 		log.Printf("rafthttp: client streaming to %s at term %d has been stopped", s.to, s.term)
 		log.Printf("rafthttp: client streaming to %s at term %d has been stopped", s.to, s.term)