|
|
@@ -39,6 +39,7 @@ const (
|
|
|
senderBufSize = 64
|
|
|
|
|
|
appRespBatchMs = 50
|
|
|
+ propBatchMs = 10
|
|
|
|
|
|
ConnReadTimeout = 5 * time.Second
|
|
|
ConnWriteTimeout = 5 * time.Second
|
|
|
@@ -66,14 +67,15 @@ type Sender interface {
|
|
|
|
|
|
func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
|
|
|
s := &sender{
|
|
|
- tr: tr,
|
|
|
- u: u,
|
|
|
- cid: cid,
|
|
|
- p: p,
|
|
|
- fs: fs,
|
|
|
- shouldstop: shouldstop,
|
|
|
- batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
|
|
|
- q: make(chan []byte, senderBufSize),
|
|
|
+ tr: tr,
|
|
|
+ u: u,
|
|
|
+ cid: cid,
|
|
|
+ p: p,
|
|
|
+ fs: fs,
|
|
|
+ shouldstop: shouldstop,
|
|
|
+ batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
|
|
|
+ propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
|
|
|
+ q: make(chan []byte, senderBufSize),
|
|
|
}
|
|
|
s.wg.Add(connPerSender)
|
|
|
for i := 0; i < connPerSender; i++ {
|
|
|
@@ -90,11 +92,12 @@ type sender struct {
|
|
|
fs *stats.FollowerStats
|
|
|
shouldstop chan struct{}
|
|
|
|
|
|
- strmCln *streamClient
|
|
|
- batcher *Batcher
|
|
|
- strmSrv *streamServer
|
|
|
- strmSrvMu sync.Mutex
|
|
|
- q chan []byte
|
|
|
+ strmCln *streamClient
|
|
|
+ batcher *Batcher
|
|
|
+ propBatcher *ProposalBatcher
|
|
|
+ strmSrv *streamServer
|
|
|
+ strmSrvMu sync.Mutex
|
|
|
+ q chan []byte
|
|
|
|
|
|
paused bool
|
|
|
mu sync.RWMutex
|
|
|
@@ -136,16 +139,37 @@ func (s *sender) Send(m raftpb.Message) error {
|
|
|
s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
|
|
|
s.batcher.Reset(time.Now())
|
|
|
}
|
|
|
- if canBatch(m) && s.hasStreamClient() {
|
|
|
- if s.batcher.ShouldBatch(time.Now()) {
|
|
|
- return nil
|
|
|
+
|
|
|
+ var err error
|
|
|
+ switch {
|
|
|
+ case isProposal(m):
|
|
|
+ s.propBatcher.Batch(m)
|
|
|
+ case canBatch(m) && s.hasStreamClient():
|
|
|
+ if !s.batcher.ShouldBatch(time.Now()) {
|
|
|
+ err = s.send(m)
|
|
|
+ }
|
|
|
+ case canUseStream(m):
|
|
|
+ if ok := s.tryStream(m); !ok {
|
|
|
+ err = s.send(m)
|
|
|
}
|
|
|
+ default:
|
|
|
+ err = s.send(m)
|
|
|
}
|
|
|
- if canUseStream(m) {
|
|
|
- if ok := s.tryStream(m); ok {
|
|
|
- return nil
|
|
|
+ // send out batched MsgProp if needed
|
|
|
+ // TODO: it is triggered by all outcoming send now, and it needs
|
|
|
+ // more clear solution. Either use separate goroutine to trigger it
|
|
|
+ // or use streaming.
|
|
|
+ if !s.propBatcher.IsEmpty() {
|
|
|
+ t := time.Now()
|
|
|
+ if !s.propBatcher.ShouldBatch(t) {
|
|
|
+ s.send(s.propBatcher.Message)
|
|
|
+ s.propBatcher.Reset(t)
|
|
|
}
|
|
|
}
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func (s *sender) send(m raftpb.Message) error {
|
|
|
// TODO: don't block. we should be able to have 1000s
|
|
|
// of messages out at a time.
|
|
|
data := pbutil.MustMarshal(&m)
|
|
|
@@ -281,3 +305,5 @@ func (s *sender) post(data []byte) error {
|
|
|
return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode))
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+func isProposal(m raftpb.Message) bool { return m.Type == raftpb.MsgProp }
|