inflights.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Copyright 2019 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tracker
  15. // Inflights limits the number of MsgApp (represented by the largest index
  16. // contained within) sent to followers but not yet acknowledged by them. Callers
  17. // use Full() to check whether more messages can be sent, call Add() whenever
  18. // they are sending a new append, and release "quota" via FreeLE() whenever an
  19. // ack is received.
  20. type Inflights struct {
  21. // the starting index in the buffer
  22. start int
  23. // number of inflights in the buffer
  24. count int
  25. // the size of the buffer
  26. size int
  27. // buffer contains the index of the last entry
  28. // inside one message.
  29. buffer []uint64
  30. }
  31. // NewInflights sets up an Inflights that allows up to 'size' inflight messages.
  32. func NewInflights(size int) *Inflights {
  33. return &Inflights{
  34. size: size,
  35. }
  36. }
  37. // Clone returns an *Inflights that is identical to but shares no memory with
  38. // the receiver.
  39. func (in *Inflights) Clone() *Inflights {
  40. ins := *in
  41. ins.buffer = append([]uint64(nil), in.buffer...)
  42. return &ins
  43. }
  44. // Add notifies the Inflights that a new message with the given index is being
  45. // dispatched. Full() must be called prior to Add() to verify that there is room
  46. // for one more message, and consecutive calls to add Add() must provide a
  47. // monotonic sequence of indexes.
  48. func (in *Inflights) Add(inflight uint64) {
  49. if in.Full() {
  50. panic("cannot add into a Full inflights")
  51. }
  52. next := in.start + in.count
  53. size := in.size
  54. if next >= size {
  55. next -= size
  56. }
  57. if next >= len(in.buffer) {
  58. in.grow()
  59. }
  60. in.buffer[next] = inflight
  61. in.count++
  62. }
  63. // grow the inflight buffer by doubling up to inflights.size. We grow on demand
  64. // instead of preallocating to inflights.size to handle systems which have
  65. // thousands of Raft groups per process.
  66. func (in *Inflights) grow() {
  67. newSize := len(in.buffer) * 2
  68. if newSize == 0 {
  69. newSize = 1
  70. } else if newSize > in.size {
  71. newSize = in.size
  72. }
  73. newBuffer := make([]uint64, newSize)
  74. copy(newBuffer, in.buffer)
  75. in.buffer = newBuffer
  76. }
  77. // FreeLE frees the inflights smaller or equal to the given `to` flight.
  78. func (in *Inflights) FreeLE(to uint64) {
  79. if in.count == 0 || to < in.buffer[in.start] {
  80. // out of the left side of the window
  81. return
  82. }
  83. idx := in.start
  84. var i int
  85. for i = 0; i < in.count; i++ {
  86. if to < in.buffer[idx] { // found the first large inflight
  87. break
  88. }
  89. // increase index and maybe rotate
  90. size := in.size
  91. if idx++; idx >= size {
  92. idx -= size
  93. }
  94. }
  95. // free i inflights and set new start index
  96. in.count -= i
  97. in.start = idx
  98. if in.count == 0 {
  99. // inflights is empty, reset the start index so that we don't grow the
  100. // buffer unnecessarily.
  101. in.start = 0
  102. }
  103. }
  104. // FreeFirstOne releases the first inflight. This is a no-op if nothing is
  105. // inflight.
  106. func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
  107. // Full returns true if no more messages can be sent at the moment.
  108. func (in *Inflights) Full() bool {
  109. return in.count == in.size
  110. }
  111. // Count returns the number of inflight messages.
  112. func (in *Inflights) Count() int { return in.count }
  113. // reset frees all inflights.
  114. func (in *Inflights) reset() {
  115. in.count = 0
  116. in.start = 0
  117. }