1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- // Copyright 2015 CoreOS, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package rafthttp
- import (
- "time"
- "github.com/coreos/etcd/raft/raftpb"
- )
- var (
- emptyMsgProp = raftpb.Message{Type: raftpb.MsgProp}
- )
- type Batcher struct {
- batchedN int
- batchedT time.Time
- batchN int
- batchD time.Duration
- }
- func NewBatcher(n int, d time.Duration) *Batcher {
- return &Batcher{
- batchN: n,
- batchD: d,
- batchedT: time.Now(),
- }
- }
- func (b *Batcher) ShouldBatch(now time.Time) bool {
- b.batchedN++
- batchedD := now.Sub(b.batchedT)
- if b.batchedN >= b.batchN || batchedD >= b.batchD {
- b.Reset(now)
- return false
- }
- return true
- }
- func (b *Batcher) Reset(t time.Time) {
- b.batchedN = 0
- b.batchedT = t
- }
- func canBatch(m raftpb.Message) bool {
- return m.Type == raftpb.MsgAppResp && m.Reject == false
- }
- type ProposalBatcher struct {
- *Batcher
- raftpb.Message
- }
- func NewProposalBatcher(n int, d time.Duration) *ProposalBatcher {
- return &ProposalBatcher{
- Batcher: NewBatcher(n, d),
- Message: emptyMsgProp,
- }
- }
- func (b *ProposalBatcher) Batch(m raftpb.Message) {
- b.Message.From = m.From
- b.Message.To = m.To
- b.Message.Entries = append(b.Message.Entries, m.Entries...)
- }
- func (b *ProposalBatcher) IsEmpty() bool {
- return len(b.Message.Entries) == 0
- }
- func (b *ProposalBatcher) Reset(t time.Time) {
- b.Batcher.Reset(t)
- b.Message = emptyMsgProp
- }
|