log_unstable.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "log"
  17. pb "github.com/coreos/etcd/raft/raftpb"
  18. )
  19. // unstable.entris[i] has raft log position i+unstable.offset.
  20. // Note that unstable.offset may be less than the highest log
  21. // position in storage; this means that the next write to storage
  22. // might need to truncate the log before persisting unstable.entries.
  23. type unstable struct {
  24. // the incoming unstable snapshot, if any.
  25. snapshot *pb.Snapshot
  26. // all entries that have not yet been written to storage.
  27. entries []pb.Entry
  28. offset uint64
  29. }
  30. // maybeFirstIndex returns the index of the first possible entry in entries
  31. // if it has a snapshot.
  32. func (u *unstable) maybeFirstIndex() (uint64, bool) {
  33. if u.snapshot != nil {
  34. return u.snapshot.Metadata.Index + 1, true
  35. }
  36. return 0, false
  37. }
  38. // maybeLastIndex returns the last index if it has at least one
  39. // unstable entry or snapshot.
  40. func (u *unstable) maybeLastIndex() (uint64, bool) {
  41. if l := len(u.entries); l != 0 {
  42. return u.offset + uint64(l) - 1, true
  43. }
  44. if u.snapshot != nil {
  45. return u.snapshot.Metadata.Index, true
  46. }
  47. return 0, false
  48. }
  49. // myabeTerm returns the term of the entry at index i, if there
  50. // is any.
  51. func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
  52. if i < u.offset {
  53. if u.snapshot == nil {
  54. return 0, false
  55. }
  56. if u.snapshot.Metadata.Index == i {
  57. return u.snapshot.Metadata.Term, true
  58. }
  59. return 0, false
  60. }
  61. last, ok := u.maybeLastIndex()
  62. if !ok {
  63. return 0, false
  64. }
  65. if i > last {
  66. return 0, false
  67. }
  68. return u.entries[i-u.offset].Term, true
  69. }
  70. func (u *unstable) stableTo(i, t uint64) {
  71. gt, ok := u.maybeTerm(i)
  72. if !ok {
  73. return
  74. }
  75. // if i < offest, term is matched with the snapshot
  76. // only update the unstalbe entries if term is matched with
  77. // an unstable entry.
  78. if gt == t && i >= u.offset {
  79. u.entries = u.entries[i+1-u.offset:]
  80. u.offset = i + 1
  81. }
  82. }
  83. func (u *unstable) stableSnapTo(i uint64) {
  84. if u.snapshot != nil && u.snapshot.Metadata.Index == i {
  85. u.snapshot = nil
  86. }
  87. }
  88. func (u *unstable) restore(s pb.Snapshot) {
  89. u.offset = s.Metadata.Index + 1
  90. u.entries = nil
  91. u.snapshot = &s
  92. }
  93. func (u *unstable) truncateAndAppend(ents []pb.Entry) {
  94. after := ents[0].Index - 1
  95. switch {
  96. case after == u.offset+uint64(len(u.entries))-1:
  97. // after is the last index in the u.entries
  98. // directly append
  99. u.entries = append(u.entries, ents...)
  100. case after < u.offset:
  101. log.Printf("raftlog: replace the unstable entries from index %d", after+1)
  102. // The log is being truncated to before our current offset
  103. // portion, so set the offset and replace the entries
  104. u.offset = after + 1
  105. u.entries = ents
  106. default:
  107. // truncate to after and copy to u.entries
  108. // then append
  109. log.Printf("raftlog: truncate the unstable entries to index %d", after)
  110. u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...)
  111. u.entries = append(u.entries, ents...)
  112. }
  113. }
  114. func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
  115. u.mustCheckOutOfBounds(lo, hi)
  116. return u.entries[lo-u.offset : hi-u.offset]
  117. }
  118. // u.offset <= lo <= hi <= u.offset+len(u.offset)
  119. func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
  120. if lo > hi {
  121. log.Panicf("raft: invalid unstable.slice %d > %d", lo, hi)
  122. }
  123. upper := u.offset + uint64(len(u.entries))
  124. if lo < u.offset || hi > upper {
  125. log.Panicf("raft: unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
  126. }
  127. }