log_unstable.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import pb "github.com/coreos/etcd/raft/raftpb"
  16. // unstable.entries[i] has raft log position i+unstable.offset.
  17. // Note that unstable.offset may be less than the highest log
  18. // position in storage; this means that the next write to storage
  19. // might need to truncate the log before persisting unstable.entries.
  20. type unstable struct {
  21. // the incoming unstable snapshot, if any.
  22. snapshot *pb.Snapshot
  23. // all entries that have not yet been written to storage.
  24. entries []pb.Entry
  25. offset uint64
  26. logger Logger
  27. }
  28. // maybeFirstIndex returns the index of the first possible entry in entries
  29. // if it has a snapshot.
  30. func (u *unstable) maybeFirstIndex() (uint64, bool) {
  31. if u.snapshot != nil {
  32. return u.snapshot.Metadata.Index + 1, true
  33. }
  34. return 0, false
  35. }
  36. // maybeLastIndex returns the last index if it has at least one
  37. // unstable entry or snapshot.
  38. func (u *unstable) maybeLastIndex() (uint64, bool) {
  39. if l := len(u.entries); l != 0 {
  40. return u.offset + uint64(l) - 1, true
  41. }
  42. if u.snapshot != nil {
  43. return u.snapshot.Metadata.Index, true
  44. }
  45. return 0, false
  46. }
  47. // maybeTerm returns the term of the entry at index i, if there
  48. // is any.
  49. func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
  50. if i < u.offset {
  51. if u.snapshot == nil {
  52. return 0, false
  53. }
  54. if u.snapshot.Metadata.Index == i {
  55. return u.snapshot.Metadata.Term, true
  56. }
  57. return 0, false
  58. }
  59. last, ok := u.maybeLastIndex()
  60. if !ok {
  61. return 0, false
  62. }
  63. if i > last {
  64. return 0, false
  65. }
  66. return u.entries[i-u.offset].Term, true
  67. }
  68. func (u *unstable) stableTo(i, t uint64) {
  69. gt, ok := u.maybeTerm(i)
  70. if !ok {
  71. return
  72. }
  73. // if i < offset, term is matched with the snapshot
  74. // only update the unstable entries if term is matched with
  75. // an unstable entry.
  76. if gt == t && i >= u.offset {
  77. u.entries = u.entries[i+1-u.offset:]
  78. u.offset = i + 1
  79. u.shrinkEntriesArray()
  80. }
  81. }
  82. // shrinkEntriesArray discards the underlying array used by the entries slice
  83. // if most of it isn't being used. This avoids holding references to a bunch of
  84. // potentially large entries that aren't needed anymore. Simply clearing the
  85. // entries wouldn't be safe because clients might still be using them.
  86. func (u *unstable) shrinkEntriesArray() {
  87. // We replace the array if we're using less than half of the space in
  88. // it. This number is fairly arbitrary, chosen as an attempt to balance
  89. // memory usage vs number of allocations. It could probably be improved
  90. // with some focused tuning.
  91. const lenMultiple = 2
  92. if len(u.entries) == 0 {
  93. u.entries = nil
  94. } else if len(u.entries)*lenMultiple < cap(u.entries) {
  95. newEntries := make([]pb.Entry, len(u.entries))
  96. copy(newEntries, u.entries)
  97. u.entries = newEntries
  98. }
  99. }
  100. func (u *unstable) stableSnapTo(i uint64) {
  101. if u.snapshot != nil && u.snapshot.Metadata.Index == i {
  102. u.snapshot = nil
  103. }
  104. }
  105. func (u *unstable) restore(s pb.Snapshot) {
  106. u.offset = s.Metadata.Index + 1
  107. u.entries = nil
  108. u.snapshot = &s
  109. }
  110. func (u *unstable) truncateAndAppend(ents []pb.Entry) {
  111. after := ents[0].Index
  112. switch {
  113. case after == u.offset+uint64(len(u.entries)):
  114. // after is the next index in the u.entries
  115. // directly append
  116. u.entries = append(u.entries, ents...)
  117. case after <= u.offset:
  118. u.logger.Infof("replace the unstable entries from index %d", after)
  119. // The log is being truncated to before our current offset
  120. // portion, so set the offset and replace the entries
  121. u.offset = after
  122. u.entries = ents
  123. default:
  124. // truncate to after and copy to u.entries
  125. // then append
  126. u.logger.Infof("truncate the unstable entries before index %d", after)
  127. u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
  128. u.entries = append(u.entries, ents...)
  129. }
  130. }
  131. func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
  132. u.mustCheckOutOfBounds(lo, hi)
  133. return u.entries[lo-u.offset : hi-u.offset]
  134. }
  135. // u.offset <= lo <= hi <= u.offset+len(u.offset)
  136. func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
  137. if lo > hi {
  138. u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi)
  139. }
  140. upper := u.offset + uint64(len(u.entries))
  141. if lo < u.offset || hi > upper {
  142. u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
  143. }
  144. }