log_unstable.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "log"
  16. pb "github.com/coreos/etcd/raft/raftpb"
  17. )
  18. // unstable.entris[i] has raft log position i+unstable.offset.
  19. // Note that unstable.offset may be less than the highest log
  20. // position in storage; this means that the next write to storage
  21. // might need to truncate the log before persisting unstable.entries.
  22. type unstable struct {
  23. // the incoming unstable snapshot, if any.
  24. snapshot *pb.Snapshot
  25. // all entries that have not yet been written to storage.
  26. entries []pb.Entry
  27. offset uint64
  28. }
  29. // maybeFirstIndex returns the index of the first possible entry in entries
  30. // if it has a snapshot.
  31. func (u *unstable) maybeFirstIndex() (uint64, bool) {
  32. if u.snapshot != nil {
  33. return u.snapshot.Metadata.Index + 1, true
  34. }
  35. return 0, false
  36. }
  37. // maybeLastIndex returns the last index if it has at least one
  38. // unstable entry or snapshot.
  39. func (u *unstable) maybeLastIndex() (uint64, bool) {
  40. if l := len(u.entries); l != 0 {
  41. return u.offset + uint64(l) - 1, true
  42. }
  43. if u.snapshot != nil {
  44. return u.snapshot.Metadata.Index, true
  45. }
  46. return 0, false
  47. }
  48. // myabeTerm returns the term of the entry at index i, if there
  49. // is any.
  50. func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
  51. if i < u.offset {
  52. if u.snapshot == nil {
  53. return 0, false
  54. }
  55. if u.snapshot.Metadata.Index == i {
  56. return u.snapshot.Metadata.Term, true
  57. }
  58. return 0, false
  59. }
  60. last, ok := u.maybeLastIndex()
  61. if !ok {
  62. return 0, false
  63. }
  64. if i > last {
  65. return 0, false
  66. }
  67. return u.entries[i-u.offset].Term, true
  68. }
  69. func (u *unstable) stableTo(i, t uint64) {
  70. gt, ok := u.maybeTerm(i)
  71. if !ok {
  72. return
  73. }
  74. // if i < offest, term is matched with the snapshot
  75. // only update the unstalbe entries if term is matched with
  76. // an unstable entry.
  77. if gt == t && i >= u.offset {
  78. u.entries = u.entries[i+1-u.offset:]
  79. u.offset = i + 1
  80. }
  81. }
  82. func (u *unstable) stableSnapTo(i uint64) {
  83. if u.snapshot != nil && u.snapshot.Metadata.Index == i {
  84. u.snapshot = nil
  85. }
  86. }
  87. func (u *unstable) restore(s pb.Snapshot) {
  88. u.offset = s.Metadata.Index + 1
  89. u.entries = nil
  90. u.snapshot = &s
  91. }
  92. func (u *unstable) truncateAndAppend(ents []pb.Entry) {
  93. after := ents[0].Index - 1
  94. switch {
  95. case after == u.offset+uint64(len(u.entries))-1:
  96. // after is the last index in the u.entries
  97. // directly append
  98. u.entries = append(u.entries, ents...)
  99. case after < u.offset:
  100. log.Printf("raftlog: replace the unstable entries from index %d", after+1)
  101. // The log is being truncated to before our current offset
  102. // portion, so set the offset and replace the entries
  103. u.offset = after + 1
  104. u.entries = ents
  105. default:
  106. // truncate to after and copy to u.entries
  107. // then append
  108. log.Printf("raftlog: truncate the unstable entries to index %d", after)
  109. u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...)
  110. u.entries = append(u.entries, ents...)
  111. }
  112. }
  113. func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
  114. u.mustCheckOutOfBounds(lo, hi)
  115. return u.entries[lo-u.offset : hi-u.offset]
  116. }
  117. // u.offset <= lo <= hi <= u.offset+len(u.offset)
  118. func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
  119. if lo > hi {
  120. log.Panicf("raft: invalid unstable.slice %d > %d", lo, hi)
  121. }
  122. upper := u.offset + uint64(len(u.entries))
  123. if lo < u.offset || hi > upper {
  124. log.Panicf("raft: unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
  125. }
  126. }