batcher.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package rafthttp
  2. import (
  3. "time"
  4. "github.com/coreos/etcd/raft/raftpb"
  5. )
  6. var (
  7. emptyMsgProp = raftpb.Message{Type: raftpb.MsgProp}
  8. )
  9. type Batcher struct {
  10. batchedN int
  11. batchedT time.Time
  12. batchN int
  13. batchD time.Duration
  14. }
  15. func NewBatcher(n int, d time.Duration) *Batcher {
  16. return &Batcher{
  17. batchN: n,
  18. batchD: d,
  19. batchedT: time.Now(),
  20. }
  21. }
  22. func (b *Batcher) ShouldBatch(now time.Time) bool {
  23. b.batchedN++
  24. batchedD := now.Sub(b.batchedT)
  25. if b.batchedN >= b.batchN || batchedD >= b.batchD {
  26. b.Reset(now)
  27. return false
  28. }
  29. return true
  30. }
  31. func (b *Batcher) Reset(t time.Time) {
  32. b.batchedN = 0
  33. b.batchedT = t
  34. }
  35. func canBatch(m raftpb.Message) bool {
  36. return m.Type == raftpb.MsgAppResp && m.Reject == false
  37. }
  38. type ProposalBatcher struct {
  39. *Batcher
  40. raftpb.Message
  41. }
  42. func NewProposalBatcher(n int, d time.Duration) *ProposalBatcher {
  43. return &ProposalBatcher{
  44. Batcher: NewBatcher(n, d),
  45. Message: emptyMsgProp,
  46. }
  47. }
  48. func (b *ProposalBatcher) Batch(m raftpb.Message) {
  49. b.Message.From = m.From
  50. b.Message.To = m.To
  51. b.Message.Entries = append(b.Message.Entries, m.Entries...)
  52. }
  53. func (b *ProposalBatcher) IsEmpty() bool {
  54. return len(b.Message.Entries) == 0
  55. }
  56. func (b *ProposalBatcher) Reset(t time.Time) {
  57. b.Batcher.Reset(t)
  58. b.Message = emptyMsgProp
  59. }