storage.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "log"
  17. "sync"
  18. pb "github.com/coreos/etcd/raft/raftpb"
  19. )
  20. // ErrCompacted is returned by Storage.Entries/Compact when a requested
  21. // index is unavailable because it predates the last snapshot.
  22. var ErrCompacted = errors.New("requested index is unavailable due to compaction")
  23. var ErrUnavailable = errors.New("requested entry at index is unavailable")
  24. // Storage is an interface that may be implemented by the application
  25. // to retrieve log entries from storage.
  26. //
  27. // If any Storage method returns an error, the raft instance will
  28. // become inoperable and refuse to participate in elections; the
  29. // application is responsible for cleanup and recovery in this case.
  30. type Storage interface {
  31. // InitialState returns the saved HardState and ConfState information.
  32. InitialState() (pb.HardState, pb.ConfState, error)
  33. // Entries returns a slice of log entries in the range [lo,hi).
  34. Entries(lo, hi uint64) ([]pb.Entry, error)
  35. // Term returns the term of entry i, which must be in the range
  36. // [FirstIndex()-1, LastIndex()]. The term of the entry before
  37. // FirstIndex is retained for matching purposes even though the
  38. // rest of that entry may not be available.
  39. Term(i uint64) (uint64, error)
  40. // LastIndex returns the index of the last entry in the log.
  41. LastIndex() (uint64, error)
  42. // FirstIndex returns the index of the first log entry that is
  43. // possibly available via Entries (older entries have been incorporated
  44. // into the latest Snapshot; if storage only contains the dummy entry the
  45. // first log entry is not available).
  46. FirstIndex() (uint64, error)
  47. // Snapshot returns the most recent snapshot.
  48. Snapshot() (pb.Snapshot, error)
  49. }
  50. // MemoryStorage implements the Storage interface backed by an
  51. // in-memory array.
  52. type MemoryStorage struct {
  53. // Protects access to all fields. Most methods of MemoryStorage are
  54. // run on the raft goroutine, but Append() is run on an application
  55. // goroutine.
  56. sync.Mutex
  57. hardState pb.HardState
  58. snapshot pb.Snapshot
  59. // ents[i] has raft log position i+snapshot.Metadata.Index
  60. ents []pb.Entry
  61. }
  62. // NewMemoryStorage creates an empty MemoryStorage.
  63. func NewMemoryStorage() *MemoryStorage {
  64. return &MemoryStorage{
  65. // When starting from scratch populate the list with a dummy entry at term zero.
  66. ents: make([]pb.Entry, 1),
  67. }
  68. }
  69. // InitialState implements the Storage interface.
  70. func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) {
  71. return ms.hardState, ms.snapshot.Metadata.ConfState, nil
  72. }
  73. // SetHardState saves the current HardState.
  74. func (ms *MemoryStorage) SetHardState(st pb.HardState) error {
  75. ms.hardState = st
  76. return nil
  77. }
  78. // Entries implements the Storage interface.
  79. func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
  80. ms.Lock()
  81. defer ms.Unlock()
  82. offset := ms.snapshot.Metadata.Index
  83. if lo <= offset {
  84. return nil, ErrCompacted
  85. }
  86. // only contains dummy entries.
  87. if len(ms.ents) == 1 {
  88. return nil, ErrUnavailable
  89. }
  90. return ms.ents[lo-offset : hi-offset], nil
  91. }
  92. // Term implements the Storage interface.
  93. func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
  94. ms.Lock()
  95. defer ms.Unlock()
  96. offset := ms.snapshot.Metadata.Index
  97. if i < offset {
  98. return 0, ErrCompacted
  99. }
  100. return ms.ents[i-offset].Term, nil
  101. }
  102. // LastIndex implements the Storage interface.
  103. func (ms *MemoryStorage) LastIndex() (uint64, error) {
  104. ms.Lock()
  105. defer ms.Unlock()
  106. return ms.snapshot.Metadata.Index + uint64(len(ms.ents)) - 1, nil
  107. }
  108. // FirstIndex implements the Storage interface.
  109. func (ms *MemoryStorage) FirstIndex() (uint64, error) {
  110. ms.Lock()
  111. defer ms.Unlock()
  112. return ms.snapshot.Metadata.Index + 1, nil
  113. }
  114. // Snapshot implements the Storage interface.
  115. func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) {
  116. ms.Lock()
  117. defer ms.Unlock()
  118. return ms.snapshot, nil
  119. }
  120. // ApplySnapshot overwrites the contents of this Storage object with
  121. // those of the given snapshot.
  122. func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error {
  123. ms.Lock()
  124. defer ms.Unlock()
  125. ms.snapshot = snap
  126. ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}}
  127. return nil
  128. }
  129. // Compact discards all log entries prior to i. Creates a snapshot
  130. // which can be retrieved with the Snapshot() method and can be used
  131. // to reconstruct the state at that point.
  132. // If any configuration changes have been made since the last compaction,
  133. // the result of the last ApplyConfChange must be passed in.
  134. // It is the application's responsibility to not attempt to compact an index
  135. // greater than raftLog.applied.
  136. func (ms *MemoryStorage) Compact(i uint64, cs *pb.ConfState, data []byte) error {
  137. ms.Lock()
  138. defer ms.Unlock()
  139. offset := ms.snapshot.Metadata.Index
  140. if i <= offset {
  141. return ErrCompacted
  142. }
  143. if i > offset+uint64(len(ms.ents))-1 {
  144. log.Panicf("compact %d is out of bound lastindex(%d)", i, offset+uint64(len(ms.ents))-1)
  145. }
  146. i -= offset
  147. ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
  148. ents[0].Term = ms.ents[i].Term
  149. ents = append(ents, ms.ents[i+1:]...)
  150. ms.ents = ents
  151. ms.snapshot.Metadata.Index += i
  152. ms.snapshot.Metadata.Term = ents[0].Term
  153. if cs != nil {
  154. ms.snapshot.Metadata.ConfState = *cs
  155. }
  156. ms.snapshot.Data = data
  157. return nil
  158. }
  159. // Append the new entries to storage.
  160. func (ms *MemoryStorage) Append(entries []pb.Entry) error {
  161. ms.Lock()
  162. defer ms.Unlock()
  163. if len(entries) == 0 {
  164. return nil
  165. }
  166. first := ms.snapshot.Metadata.Index + 1
  167. last := entries[0].Index + uint64(len(entries)) - 1
  168. // shortcut if there is no new entry.
  169. if last < first {
  170. return nil
  171. }
  172. // truncate old entries
  173. if first > entries[0].Index {
  174. entries = entries[first-entries[0].Index:]
  175. }
  176. offset := entries[0].Index - ms.snapshot.Metadata.Index
  177. switch {
  178. case uint64(len(ms.ents)) > offset:
  179. ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
  180. ms.ents = append(ms.ents, entries...)
  181. case uint64(len(ms.ents)) == offset:
  182. ms.ents = append(ms.ents, entries...)
  183. default:
  184. log.Panicf("missing log entry [last: %d, append at: %d]",
  185. ms.snapshot.Metadata.Index+uint64(len(ms.ents)), entries[0].Index)
  186. }
  187. return nil
  188. }