batcher.go 655 B

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. package rafthttp
  2. import (
  3. "time"
  4. "github.com/coreos/etcd/raft/raftpb"
  5. )
  6. type Batcher struct {
  7. batchedN int
  8. batchedT time.Time
  9. batchN int
  10. batchD time.Duration
  11. }
  12. func NewBatcher(n int, d time.Duration) *Batcher {
  13. return &Batcher{
  14. batchN: n,
  15. batchD: d,
  16. batchedT: time.Now(),
  17. }
  18. }
  19. func (b *Batcher) ShouldBatch(now time.Time) bool {
  20. b.batchedN++
  21. batchedD := now.Sub(b.batchedT)
  22. if b.batchedN >= b.batchN || batchedD >= b.batchD {
  23. b.Reset(now)
  24. return false
  25. }
  26. return true
  27. }
  28. func (b *Batcher) Reset(t time.Time) {
  29. b.batchedN = 0
  30. b.batchedT = t
  31. }
  32. func canBatch(m raftpb.Message) bool {
  33. return m.Type == raftpb.MsgAppResp
  34. }