123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package raft
- import pb "go.etcd.io/etcd/raft/raftpb"
- // unstable.entries[i] has raft log position i+unstable.offset.
- // Note that unstable.offset may be less than the highest log
- // position in storage; this means that the next write to storage
- // might need to truncate the log before persisting unstable.entries.
- type unstable struct {
- // the incoming unstable snapshot, if any.
- snapshot *pb.Snapshot
- // all entries that have not yet been written to storage.
- entries []pb.Entry
- offset uint64
- logger Logger
- }
- // maybeFirstIndex returns the index of the first possible entry in entries
- // if it has a snapshot.
- func (u *unstable) maybeFirstIndex() (uint64, bool) {
- if u.snapshot != nil {
- return u.snapshot.Metadata.Index + 1, true
- }
- return 0, false
- }
- // maybeLastIndex returns the last index if it has at least one
- // unstable entry or snapshot.
- func (u *unstable) maybeLastIndex() (uint64, bool) {
- if l := len(u.entries); l != 0 {
- return u.offset + uint64(l) - 1, true
- }
- if u.snapshot != nil {
- return u.snapshot.Metadata.Index, true
- }
- return 0, false
- }
- // maybeTerm returns the term of the entry at index i, if there
- // is any.
- func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
- if i < u.offset {
- if u.snapshot != nil && u.snapshot.Metadata.Index == i {
- return u.snapshot.Metadata.Term, true
- }
- return 0, false
- }
- last, ok := u.maybeLastIndex()
- if !ok {
- return 0, false
- }
- if i > last {
- return 0, false
- }
- return u.entries[i-u.offset].Term, true
- }
- func (u *unstable) stableTo(i, t uint64) {
- gt, ok := u.maybeTerm(i)
- if !ok {
- return
- }
- // if i < offset, term is matched with the snapshot
- // only update the unstable entries if term is matched with
- // an unstable entry.
- if gt == t && i >= u.offset {
- u.entries = u.entries[i+1-u.offset:]
- u.offset = i + 1
- u.shrinkEntriesArray()
- }
- }
- // shrinkEntriesArray discards the underlying array used by the entries slice
- // if most of it isn't being used. This avoids holding references to a bunch of
- // potentially large entries that aren't needed anymore. Simply clearing the
- // entries wouldn't be safe because clients might still be using them.
- func (u *unstable) shrinkEntriesArray() {
- // We replace the array if we're using less than half of the space in
- // it. This number is fairly arbitrary, chosen as an attempt to balance
- // memory usage vs number of allocations. It could probably be improved
- // with some focused tuning.
- const lenMultiple = 2
- if len(u.entries) == 0 {
- u.entries = nil
- } else if len(u.entries)*lenMultiple < cap(u.entries) {
- newEntries := make([]pb.Entry, len(u.entries))
- copy(newEntries, u.entries)
- u.entries = newEntries
- }
- }
- func (u *unstable) stableSnapTo(i uint64) {
- if u.snapshot != nil && u.snapshot.Metadata.Index == i {
- u.snapshot = nil
- }
- }
- func (u *unstable) restore(s pb.Snapshot) {
- u.offset = s.Metadata.Index + 1
- u.entries = nil
- u.snapshot = &s
- }
- func (u *unstable) truncateAndAppend(ents []pb.Entry) {
- after := ents[0].Index
- switch {
- case after == u.offset+uint64(len(u.entries)):
- // after is the next index in the u.entries
- // directly append
- u.entries = append(u.entries, ents...)
- case after <= u.offset:
- u.logger.Infof("replace the unstable entries from index %d", after)
- // The log is being truncated to before our current offset
- // portion, so set the offset and replace the entries
- u.offset = after
- u.entries = ents
- default:
- // truncate to after and copy to u.entries
- // then append
- u.logger.Infof("truncate the unstable entries before index %d", after)
- u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
- u.entries = append(u.entries, ents...)
- }
- }
- func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
- u.mustCheckOutOfBounds(lo, hi)
- return u.entries[lo-u.offset : hi-u.offset]
- }
- // u.offset <= lo <= hi <= u.offset+len(u.entries)
- func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
- if lo > hi {
- u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi)
- }
- upper := u.offset + uint64(len(u.entries))
- if lo < u.offset || hi > upper {
- u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
- }
- }
|