|
@@ -59,7 +59,7 @@ func startStreamServer(w WriteFlusher, to types.ID, term uint64, fs *stats.Follo
|
|
|
done: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
|
}
|
|
}
|
|
|
go s.handle(w)
|
|
go s.handle(w)
|
|
|
- log.Printf("rafthttp: stream server to %s at term %d starts", to, term)
|
|
|
|
|
|
|
+ log.Printf("rafthttp: starting server stream to %s at term %d", to, term)
|
|
|
return s
|
|
return s
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -73,8 +73,8 @@ func (s *streamServer) send(ents []raftpb.Entry) error {
|
|
|
case s.q <- ents:
|
|
case s.q <- ents:
|
|
|
return nil
|
|
return nil
|
|
|
default:
|
|
default:
|
|
|
- log.Printf("rafthttp: streamer reaches maximal serving to %s", s.to)
|
|
|
|
|
- return fmt.Errorf("reach maximal serving")
|
|
|
|
|
|
|
+ log.Printf("rafthttp: maximum number of streams %s reached", s.to)
|
|
|
|
|
+ return fmt.Errorf("maximum number of streams reached")
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -88,14 +88,14 @@ func (s *streamServer) stopNotify() <-chan struct{} { return s.done }
|
|
|
func (s *streamServer) handle(w WriteFlusher) {
|
|
func (s *streamServer) handle(w WriteFlusher) {
|
|
|
defer func() {
|
|
defer func() {
|
|
|
close(s.done)
|
|
close(s.done)
|
|
|
- log.Printf("rafthttp: stream server to %s at term %d is closed", s.to, s.term)
|
|
|
|
|
|
|
+ log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
ew := &entryWriter{w: w}
|
|
ew := &entryWriter{w: w}
|
|
|
for ents := range s.q {
|
|
for ents := range s.q {
|
|
|
start := time.Now()
|
|
start := time.Now()
|
|
|
if err := ew.writeEntries(ents); err != nil {
|
|
if err := ew.writeEntries(ents); err != nil {
|
|
|
- log.Printf("rafthttp: write ents error: %v", err)
|
|
|
|
|
|
|
+ log.Printf("rafthttp: encountered error writing to server log stream: %v", err)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
w.Flush()
|
|
w.Flush()
|
|
@@ -149,7 +149,7 @@ func (s *streamClient) start(tr http.RoundTripper, u string, cid types.ID) error
|
|
|
}
|
|
}
|
|
|
s.closer = resp.Body
|
|
s.closer = resp.Body
|
|
|
go s.handle(resp.Body)
|
|
go s.handle(resp.Body)
|
|
|
- log.Printf("rafthttp: stream client to %s at term %d starts", s.to, s.term)
|
|
|
|
|
|
|
+ log.Printf("rafthttp: starting client stream to %s at term %d", s.to, s.term)
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -170,7 +170,7 @@ func (s *streamClient) isStopped() bool {
|
|
|
func (s *streamClient) handle(r io.Reader) {
|
|
func (s *streamClient) handle(r io.Reader) {
|
|
|
defer func() {
|
|
defer func() {
|
|
|
close(s.done)
|
|
close(s.done)
|
|
|
- log.Printf("rafthttp: stream client to %s at term %d is closed", s.to, s.term)
|
|
|
|
|
|
|
+ log.Printf("rafthttp: client streaming to %s at term %d is been stopped", s.to, s.term)
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
er := &entryReader{r: r}
|
|
er := &entryReader{r: r}
|
|
@@ -178,7 +178,7 @@ func (s *streamClient) handle(r io.Reader) {
|
|
|
ents, err := er.readEntries()
|
|
ents, err := er.readEntries()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
if err != io.EOF {
|
|
if err != io.EOF {
|
|
|
- log.Printf("rafthttp: read ents error: %v", err)
|
|
|
|
|
|
|
+ log.Printf("rafthttp: encountered error reading the client log stream: %v", err)
|
|
|
}
|
|
}
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|