storage.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. pb "github.com/coreos/etcd/raft/raftpb"
  19. )
  20. // ErrCompacted is returned by Storage.Entries when a requested
  21. // index is unavailable because it predates the last snapshot.
  22. var ErrCompacted = errors.New("requested index is unavailable due to compaction")
  23. // Storage is an interface that may be implemented by the application
  24. // to retrieve log entries from storage.
  25. //
  26. // If any Storage method returns an error, the raft instance will
  27. // become inoperable and refuse to participate in elections; the
  28. // application is responsible for cleanup and recovery in this case.
  29. type Storage interface {
  30. // InitialState returns the saved HardState and ConfState information.
  31. InitialState() (pb.HardState, pb.ConfState, error)
  32. // Entries returns a slice of log entries in the range [lo,hi).
  33. Entries(lo, hi uint64) ([]pb.Entry, error)
  34. // Term returns the term of entry i, which must be in the range
  35. // [FirstIndex()-1, LastIndex()]. The term of the entry before
  36. // FirstIndex is retained for matching purposes even though the
  37. // rest of that entry may not be available.
  38. Term(i uint64) (uint64, error)
  39. // LastIndex returns the index of the last entry in the log.
  40. LastIndex() (uint64, error)
  41. // FirstIndex returns the index of the first log entry that is
  42. // available via Entries (older entries have been incorporated
  43. // into the latest Snapshot).
  44. FirstIndex() (uint64, error)
  45. // Snapshot returns the most recent snapshot.
  46. Snapshot() (pb.Snapshot, error)
  47. // ApplySnapshot overwrites the contents of this Storage object with
  48. // those of the given snapshot.
  49. ApplySnapshot(pb.Snapshot) error
  50. }
  51. // MemoryStorage implements the Storage interface backed by an
  52. // in-memory array.
  53. type MemoryStorage struct {
  54. // Protects access to all fields. Most methods of MemoryStorage are
  55. // run on the raft goroutine, but Append() is run on an application
  56. // goroutine.
  57. sync.Mutex
  58. hardState pb.HardState
  59. snapshot pb.Snapshot
  60. // ents[i] has raft log position i+snapshot.Metadata.Index
  61. ents []pb.Entry
  62. }
  63. // NewMemoryStorage creates an empty MemoryStorage.
  64. func NewMemoryStorage() *MemoryStorage {
  65. return &MemoryStorage{
  66. // When starting from scratch populate the list with a dummy entry at term zero.
  67. ents: make([]pb.Entry, 1),
  68. }
  69. }
  70. // InitialState implements the Storage interface.
  71. func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) {
  72. return ms.hardState, ms.snapshot.Metadata.ConfState, nil
  73. }
  74. // SetHardState saves the current HardState.
  75. func (ms *MemoryStorage) SetHardState(st pb.HardState) error {
  76. ms.hardState = st
  77. return nil
  78. }
  79. // Entries implements the Storage interface.
  80. func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
  81. ms.Lock()
  82. defer ms.Unlock()
  83. offset := ms.snapshot.Metadata.Index
  84. if lo <= offset {
  85. return nil, ErrCompacted
  86. }
  87. return ms.ents[lo-offset : hi-offset], nil
  88. }
  89. // Term implements the Storage interface.
  90. func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
  91. ms.Lock()
  92. defer ms.Unlock()
  93. offset := ms.snapshot.Metadata.Index
  94. if i < offset {
  95. return 0, ErrCompacted
  96. }
  97. return ms.ents[i-offset].Term, nil
  98. }
  99. // LastIndex implements the Storage interface.
  100. func (ms *MemoryStorage) LastIndex() (uint64, error) {
  101. ms.Lock()
  102. defer ms.Unlock()
  103. return ms.snapshot.Metadata.Index + uint64(len(ms.ents)) - 1, nil
  104. }
  105. // FirstIndex implements the Storage interface.
  106. func (ms *MemoryStorage) FirstIndex() (uint64, error) {
  107. ms.Lock()
  108. defer ms.Unlock()
  109. return ms.snapshot.Metadata.Index + 1, nil
  110. }
  111. // Snapshot implements the Storage interface.
  112. func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) {
  113. ms.Lock()
  114. defer ms.Unlock()
  115. return ms.snapshot, nil
  116. }
  117. // ApplySnapshot implements the Storage interface.
  118. func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error {
  119. ms.Lock()
  120. defer ms.Unlock()
  121. ms.snapshot = snap
  122. ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}}
  123. return nil
  124. }
  125. // Compact discards all log entries prior to i. Creates a snapshot
  126. // which can be retrieved with the Snapshot() method and can be used
  127. // to reconstruct the state at that point.
  128. // If any configuration changes have been made since the last compaction,
  129. // the result of the last ApplyConfChange must be passed in.
  130. // It is the application's responsibility to not attempt to compact an index
  131. // greater than raftLog.applied.
  132. func (ms *MemoryStorage) Compact(i uint64, cs *pb.ConfState, data []byte) error {
  133. ms.Lock()
  134. defer ms.Unlock()
  135. offset := ms.snapshot.Metadata.Index
  136. if i <= offset || i > offset+uint64(len(ms.ents))-1 {
  137. panic(fmt.Sprintf("compact %d out of bounds (%d, %d)", i, offset,
  138. offset+uint64(len(ms.ents))-1))
  139. }
  140. i -= offset
  141. ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
  142. ents[0].Term = ms.ents[i].Term
  143. ents = append(ents, ms.ents[i+1:]...)
  144. ms.ents = ents
  145. ms.snapshot.Metadata.Index += i
  146. ms.snapshot.Metadata.Term = ents[0].Term
  147. if cs != nil {
  148. ms.snapshot.Metadata.ConfState = *cs
  149. }
  150. ms.snapshot.Data = data
  151. return nil
  152. }
  153. // Append the new entries to storage.
  154. func (ms *MemoryStorage) Append(entries []pb.Entry) {
  155. ms.Lock()
  156. defer ms.Unlock()
  157. if len(entries) == 0 {
  158. return
  159. }
  160. offset := entries[0].Index - ms.snapshot.Metadata.Index
  161. if uint64(len(ms.ents)) >= offset {
  162. ms.ents = ms.ents[:offset]
  163. }
  164. ms.ents = append(ms.ents, entries...)
  165. }