wal.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wal
  14. import (
  15. "errors"
  16. "fmt"
  17. "hash/crc32"
  18. "io"
  19. "log"
  20. "os"
  21. "path"
  22. "reflect"
  23. "github.com/coreos/etcd/pkg/fileutil"
  24. "github.com/coreos/etcd/pkg/pbutil"
  25. "github.com/coreos/etcd/raft"
  26. "github.com/coreos/etcd/raft/raftpb"
  27. "github.com/coreos/etcd/wal/walpb"
  28. )
  29. const (
  30. metadataType int64 = iota + 1
  31. entryType
  32. stateType
  33. crcType
  34. snapshotType
  35. // the owner can make/remove files inside the directory
  36. privateDirMode = 0700
  37. )
  38. var (
  39. ErrMetadataConflict = errors.New("wal: conflicting metadata found")
  40. ErrFileNotFound = errors.New("wal: file not found")
  41. ErrCRCMismatch = errors.New("wal: crc mismatch")
  42. ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
  43. ErrSnapshotNotFound = errors.New("wal: snapshot not found")
  44. crcTable = crc32.MakeTable(crc32.Castagnoli)
  45. )
  46. // WAL is a logical repersentation of the stable storage.
  47. // WAL is either in read mode or append mode but not both.
  48. // A newly created WAL is in append mode, and ready for appending records.
  49. // A just opened WAL is in read mode, and ready for reading records.
  50. // The WAL will be ready for appending after reading out all the previous records.
  51. type WAL struct {
  52. dir string // the living directory of the underlay files
  53. metadata []byte // metadata recorded at the head of each WAL
  54. state raftpb.HardState // hardstate recorded at the head of WAL
  55. start walpb.Snapshot // snapshot to start reading
  56. decoder *decoder // decoder to decode records
  57. f *os.File // underlay file opened for appending, sync
  58. seq uint64 // sequence of the wal file currently used for writes
  59. enti uint64 // index of the last entry saved to the wal
  60. encoder *encoder // encoder to encode records
  61. locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing)
  62. }
  63. // Create creates a WAL ready for appending records. The given metadata is
  64. // recorded at the head of each WAL file, and can be retrieved with ReadAll.
  65. func Create(dirpath string, metadata []byte) (*WAL, error) {
  66. if Exist(dirpath) {
  67. return nil, os.ErrExist
  68. }
  69. if err := os.MkdirAll(dirpath, privateDirMode); err != nil {
  70. return nil, err
  71. }
  72. p := path.Join(dirpath, walName(0, 0))
  73. f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
  74. if err != nil {
  75. return nil, err
  76. }
  77. l, err := fileutil.NewLock(f.Name())
  78. if err != nil {
  79. return nil, err
  80. }
  81. err = l.Lock()
  82. if err != nil {
  83. return nil, err
  84. }
  85. w := &WAL{
  86. dir: dirpath,
  87. metadata: metadata,
  88. seq: 0,
  89. f: f,
  90. encoder: newEncoder(f, 0),
  91. }
  92. w.locks = append(w.locks, l)
  93. if err := w.saveCrc(0); err != nil {
  94. return nil, err
  95. }
  96. if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
  97. return nil, err
  98. }
  99. if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
  100. return nil, err
  101. }
  102. return w, nil
  103. }
  104. // Open opens the WAL at the given snap.
  105. // The snap SHOULD have been previously saved to the WAL, or the following
  106. // ReadAll will fail.
  107. // The returned WAL is ready to read and the first record will be the one after
  108. // the given snap. The WAL cannot be appended to before reading out all of its
  109. // previous records.
  110. func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
  111. return openAtIndex(dirpath, snap, true)
  112. }
  113. // OpenNotInUse only opens the wal files that are not in use.
  114. // Other than that, it is similar to Open.
  115. func OpenNotInUse(dirpath string, snap walpb.Snapshot) (*WAL, error) {
  116. return openAtIndex(dirpath, snap, false)
  117. }
  118. func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
  119. names, err := fileutil.ReadDir(dirpath)
  120. if err != nil {
  121. return nil, err
  122. }
  123. names = checkWalNames(names)
  124. if len(names) == 0 {
  125. return nil, ErrFileNotFound
  126. }
  127. nameIndex, ok := searchIndex(names, snap.Index)
  128. if !ok || !isValidSeq(names[nameIndex:]) {
  129. return nil, ErrFileNotFound
  130. }
  131. // open the wal files for reading
  132. rcs := make([]io.ReadCloser, 0)
  133. ls := make([]fileutil.Lock, 0)
  134. for _, name := range names[nameIndex:] {
  135. f, err := os.Open(path.Join(dirpath, name))
  136. if err != nil {
  137. return nil, err
  138. }
  139. l, err := fileutil.NewLock(f.Name())
  140. if err != nil {
  141. return nil, err
  142. }
  143. err = l.TryLock()
  144. if err != nil {
  145. if all {
  146. return nil, err
  147. } else {
  148. log.Printf("wal: opened all the files until %s, since it is still in use by an etcd server", name)
  149. break
  150. }
  151. }
  152. rcs = append(rcs, f)
  153. ls = append(ls, l)
  154. }
  155. rc := MultiReadCloser(rcs...)
  156. // open the lastest wal file for appending
  157. seq, _, err := parseWalName(names[len(names)-1])
  158. if err != nil {
  159. rc.Close()
  160. return nil, err
  161. }
  162. last := path.Join(dirpath, names[len(names)-1])
  163. f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
  164. if err != nil {
  165. rc.Close()
  166. return nil, err
  167. }
  168. // create a WAL ready for reading
  169. w := &WAL{
  170. dir: dirpath,
  171. start: snap,
  172. decoder: newDecoder(rc),
  173. f: f,
  174. seq: seq,
  175. locks: ls,
  176. }
  177. return w, nil
  178. }
  179. // ReadAll reads out all records of the current WAL.
  180. // If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
  181. // If loaded snap doesn't match with the expected one, it will return
  182. // ErrSnapshotMismatch.
  183. // TODO: detect not-last-snap error.
  184. // TODO: maybe loose the checking of match.
  185. // After ReadAll, the WAL will be ready for appending new records.
  186. func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
  187. rec := &walpb.Record{}
  188. decoder := w.decoder
  189. var match bool
  190. for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
  191. switch rec.Type {
  192. case entryType:
  193. e := mustUnmarshalEntry(rec.Data)
  194. if e.Index > w.start.Index {
  195. ents = append(ents[:e.Index-w.start.Index-1], e)
  196. }
  197. w.enti = e.Index
  198. case stateType:
  199. state = mustUnmarshalState(rec.Data)
  200. case metadataType:
  201. if metadata != nil && !reflect.DeepEqual(metadata, rec.Data) {
  202. state.Reset()
  203. return nil, state, nil, ErrMetadataConflict
  204. }
  205. metadata = rec.Data
  206. case crcType:
  207. crc := decoder.crc.Sum32()
  208. // current crc of decoder must match the crc of the record.
  209. // do no need to match 0 crc, since the decoder is a new one at this case.
  210. if crc != 0 && rec.Validate(crc) != nil {
  211. state.Reset()
  212. return nil, state, nil, ErrCRCMismatch
  213. }
  214. decoder.updateCRC(rec.Crc)
  215. case snapshotType:
  216. var snap walpb.Snapshot
  217. pbutil.MustUnmarshal(&snap, rec.Data)
  218. if snap.Index == w.start.Index {
  219. if snap.Term != w.start.Term {
  220. state.Reset()
  221. return nil, state, nil, ErrSnapshotMismatch
  222. }
  223. match = true
  224. }
  225. default:
  226. state.Reset()
  227. return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
  228. }
  229. }
  230. if err != io.EOF {
  231. state.Reset()
  232. return nil, state, nil, err
  233. }
  234. if !match {
  235. state.Reset()
  236. return nil, state, nil, ErrSnapshotNotFound
  237. }
  238. // close decoder, disable reading
  239. w.decoder.close()
  240. w.start = walpb.Snapshot{}
  241. w.metadata = metadata
  242. // create encoder (chain crc with the decoder), enable appending
  243. w.encoder = newEncoder(w.f, w.decoder.lastCRC())
  244. w.decoder = nil
  245. return metadata, state, ents, nil
  246. }
  247. // Cut closes current file written and creates a new one ready to append.
  248. func (w *WAL) Cut() error {
  249. // create a new wal file with name sequence + 1
  250. fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
  251. f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
  252. if err != nil {
  253. return err
  254. }
  255. l, err := fileutil.NewLock(f.Name())
  256. if err != nil {
  257. return err
  258. }
  259. err = l.Lock()
  260. if err != nil {
  261. return err
  262. }
  263. w.locks = append(w.locks, l)
  264. if err = w.sync(); err != nil {
  265. return err
  266. }
  267. w.f.Close()
  268. // update writer and save the previous crc
  269. w.f = f
  270. w.seq++
  271. prevCrc := w.encoder.crc.Sum32()
  272. w.encoder = newEncoder(w.f, prevCrc)
  273. if err := w.saveCrc(prevCrc); err != nil {
  274. return err
  275. }
  276. if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
  277. return err
  278. }
  279. if err := w.SaveState(&w.state); err != nil {
  280. return err
  281. }
  282. return w.sync()
  283. }
  284. func (w *WAL) sync() error {
  285. if w.encoder != nil {
  286. if err := w.encoder.flush(); err != nil {
  287. return err
  288. }
  289. }
  290. return w.f.Sync()
  291. }
  292. // ReleaseLockTo releases the locks w is holding, which
  293. // have index smaller or equal to the given index.
  294. func (w *WAL) ReleaseLockTo(index uint64) error {
  295. for _, l := range w.locks {
  296. _, i, err := parseWalName(path.Base(l.Name()))
  297. if err != nil {
  298. return err
  299. }
  300. if i > index {
  301. return nil
  302. }
  303. err = l.Unlock()
  304. if err != nil {
  305. return err
  306. }
  307. err = l.Destroy()
  308. if err != nil {
  309. return err
  310. }
  311. w.locks = w.locks[1:]
  312. }
  313. return nil
  314. }
  315. func (w *WAL) Close() error {
  316. if w.f != nil {
  317. if err := w.sync(); err != nil {
  318. return err
  319. }
  320. if err := w.f.Close(); err != nil {
  321. return err
  322. }
  323. }
  324. for _, l := range w.locks {
  325. // TODO: log the error
  326. l.Unlock()
  327. l.Destroy()
  328. }
  329. return nil
  330. }
  331. func (w *WAL) SaveEntry(e *raftpb.Entry) error {
  332. b := pbutil.MustMarshal(e)
  333. rec := &walpb.Record{Type: entryType, Data: b}
  334. if err := w.encoder.encode(rec); err != nil {
  335. return err
  336. }
  337. w.enti = e.Index
  338. return nil
  339. }
  340. func (w *WAL) SaveState(s *raftpb.HardState) error {
  341. if raft.IsEmptyHardState(*s) {
  342. return nil
  343. }
  344. w.state = *s
  345. b := pbutil.MustMarshal(s)
  346. rec := &walpb.Record{Type: stateType, Data: b}
  347. return w.encoder.encode(rec)
  348. }
  349. func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
  350. // TODO(xiangli): no more reference operator
  351. if err := w.SaveState(&st); err != nil {
  352. return err
  353. }
  354. for i := range ents {
  355. if err := w.SaveEntry(&ents[i]); err != nil {
  356. return err
  357. }
  358. }
  359. return w.sync()
  360. }
  361. func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
  362. b := pbutil.MustMarshal(&e)
  363. rec := &walpb.Record{Type: snapshotType, Data: b}
  364. if err := w.encoder.encode(rec); err != nil {
  365. return err
  366. }
  367. // update enti only when snapshot is ahead of last index
  368. if w.enti < e.Index {
  369. w.enti = e.Index
  370. }
  371. return w.sync()
  372. }
  373. func (w *WAL) saveCrc(prevCrc uint32) error {
  374. return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
  375. }