|
@@ -98,10 +98,11 @@ type sender struct {
|
|
|
strmCln *streamClient
|
|
strmCln *streamClient
|
|
|
batcher *Batcher
|
|
batcher *Batcher
|
|
|
propBatcher *ProposalBatcher
|
|
propBatcher *ProposalBatcher
|
|
|
- strmSrv *streamServer
|
|
|
|
|
- strmSrvMu sync.Mutex
|
|
|
|
|
q chan []byte
|
|
q chan []byte
|
|
|
|
|
|
|
|
|
|
+ strmSrvMu sync.Mutex
|
|
|
|
|
+ strmSrv *streamServer
|
|
|
|
|
+
|
|
|
// wait for the handling routines
|
|
// wait for the handling routines
|
|
|
wg sync.WaitGroup
|
|
wg sync.WaitGroup
|
|
|
|
|
|
|
@@ -124,6 +125,7 @@ func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-cha
|
|
|
}
|
|
}
|
|
|
// stop the existing one
|
|
// stop the existing one
|
|
|
s.strmSrv.stop()
|
|
s.strmSrv.stop()
|
|
|
|
|
+ s.strmSrv = nil
|
|
|
}
|
|
}
|
|
|
s.strmSrv = startStreamServer(w, to, term, s.fs)
|
|
s.strmSrv = startStreamServer(w, to, term, s.fs)
|
|
|
return s.strmSrv.stopNotify(), nil
|
|
return s.strmSrv.stopNotify(), nil
|
|
@@ -199,6 +201,7 @@ func (s *sender) Stop() {
|
|
|
s.strmSrvMu.Lock()
|
|
s.strmSrvMu.Lock()
|
|
|
if s.strmSrv != nil {
|
|
if s.strmSrv != nil {
|
|
|
s.strmSrv.stop()
|
|
s.strmSrv.stop()
|
|
|
|
|
+ s.strmSrv = nil
|
|
|
}
|
|
}
|
|
|
s.strmSrvMu.Unlock()
|
|
s.strmSrvMu.Unlock()
|
|
|
if s.strmCln != nil {
|
|
if s.strmCln != nil {
|