123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- // Copyright 2019 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package tracker
- // Inflights limits the number of MsgApp (represented by the largest index
- // contained within) sent to followers but not yet acknowledged by them. Callers
- // use Full() to check whether more messages can be sent, call Add() whenever
- // they are sending a new append, and release "quota" via FreeLE() whenever an
- // ack is received.
- type Inflights struct {
- // the starting index in the buffer
- start int
- // number of inflights in the buffer
- count int
- // the size of the buffer
- size int
- // buffer contains the index of the last entry
- // inside one message.
- buffer []uint64
- }
- // NewInflights sets up an Inflights that allows up to 'size' inflight messages.
- func NewInflights(size int) *Inflights {
- return &Inflights{
- size: size,
- }
- }
- // Clone returns an *Inflights that is identical to but shares no memory with
- // the receiver.
- func (in *Inflights) Clone() *Inflights {
- ins := *in
- ins.buffer = append([]uint64(nil), in.buffer...)
- return &ins
- }
- // Add notifies the Inflights that a new message with the given index is being
- // dispatched. Full() must be called prior to Add() to verify that there is room
- // for one more message, and consecutive calls to add Add() must provide a
- // monotonic sequence of indexes.
- func (in *Inflights) Add(inflight uint64) {
- if in.Full() {
- panic("cannot add into a Full inflights")
- }
- next := in.start + in.count
- size := in.size
- if next >= size {
- next -= size
- }
- if next >= len(in.buffer) {
- in.grow()
- }
- in.buffer[next] = inflight
- in.count++
- }
- // grow the inflight buffer by doubling up to inflights.size. We grow on demand
- // instead of preallocating to inflights.size to handle systems which have
- // thousands of Raft groups per process.
- func (in *Inflights) grow() {
- newSize := len(in.buffer) * 2
- if newSize == 0 {
- newSize = 1
- } else if newSize > in.size {
- newSize = in.size
- }
- newBuffer := make([]uint64, newSize)
- copy(newBuffer, in.buffer)
- in.buffer = newBuffer
- }
- // FreeLE frees the inflights smaller or equal to the given `to` flight.
- func (in *Inflights) FreeLE(to uint64) {
- if in.count == 0 || to < in.buffer[in.start] {
- // out of the left side of the window
- return
- }
- idx := in.start
- var i int
- for i = 0; i < in.count; i++ {
- if to < in.buffer[idx] { // found the first large inflight
- break
- }
- // increase index and maybe rotate
- size := in.size
- if idx++; idx >= size {
- idx -= size
- }
- }
- // free i inflights and set new start index
- in.count -= i
- in.start = idx
- if in.count == 0 {
- // inflights is empty, reset the start index so that we don't grow the
- // buffer unnecessarily.
- in.start = 0
- }
- }
- // FreeFirstOne releases the first inflight. This is a no-op if nothing is
- // inflight.
- func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
- // Full returns true if no more messages can be sent at the moment.
- func (in *Inflights) Full() bool {
- return in.count == in.size
- }
- // Count returns the number of inflight messages.
- func (in *Inflights) Count() int { return in.count }
- // reset frees all inflights.
- func (in *Inflights) reset() {
- in.count = 0
- in.start = 0
- }
|