wal.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wal
  14. import (
  15. "errors"
  16. "fmt"
  17. "hash/crc32"
  18. "io"
  19. "log"
  20. "os"
  21. "path"
  22. "reflect"
  23. "sort"
  24. "github.com/coreos/etcd/pkg/fileutil"
  25. "github.com/coreos/etcd/pkg/pbutil"
  26. "github.com/coreos/etcd/raft"
  27. "github.com/coreos/etcd/raft/raftpb"
  28. "github.com/coreos/etcd/wal/walpb"
  29. )
  30. const (
  31. metadataType int64 = iota + 1
  32. entryType
  33. stateType
  34. crcType
  35. // the owner can make/remove files inside the directory
  36. privateDirMode = 0700
  37. )
  38. var (
  39. ErrMetadataConflict = errors.New("wal: conflicting metadata found")
  40. ErrFileNotFound = errors.New("wal: file not found")
  41. ErrCRCMismatch = errors.New("wal: crc mismatch")
  42. crcTable = crc32.MakeTable(crc32.Castagnoli)
  43. )
  44. // WAL is a logical repersentation of the stable storage.
  45. // WAL is either in read mode or append mode but not both.
  46. // A newly created WAL is in append mode, and ready for appending records.
  47. // A just opened WAL is in read mode, and ready for reading records.
  48. // The WAL will be ready for appending after reading out all the previous records.
  49. type WAL struct {
  50. dir string // the living directory of the underlay files
  51. metadata []byte // metadata recorded at the head of each WAL
  52. state raftpb.HardState // hardstate recorded at the head of WAL
  53. ri uint64 // index of entry to start reading
  54. decoder *decoder // decoder to decode records
  55. f *os.File // underlay file opened for appending, sync
  56. seq uint64 // sequence of the wal file currently used for writes
  57. enti uint64 // index of the last entry saved to the wal
  58. encoder *encoder // encoder to encode records
  59. locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing)
  60. }
  61. // Create creates a WAL ready for appending records. The given metadata is
  62. // recorded at the head of each WAL file, and can be retrieved with ReadAll.
  63. func Create(dirpath string, metadata []byte) (*WAL, error) {
  64. if Exist(dirpath) {
  65. return nil, os.ErrExist
  66. }
  67. if err := os.MkdirAll(dirpath, privateDirMode); err != nil {
  68. return nil, err
  69. }
  70. p := path.Join(dirpath, walName(0, 0))
  71. f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
  72. if err != nil {
  73. return nil, err
  74. }
  75. l, err := fileutil.NewLock(f.Name())
  76. if err != nil {
  77. return nil, err
  78. }
  79. err = l.Lock()
  80. if err != nil {
  81. return nil, err
  82. }
  83. w := &WAL{
  84. dir: dirpath,
  85. metadata: metadata,
  86. seq: 0,
  87. f: f,
  88. encoder: newEncoder(f, 0),
  89. }
  90. w.locks = append(w.locks, l)
  91. if err := w.saveCrc(0); err != nil {
  92. return nil, err
  93. }
  94. if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
  95. return nil, err
  96. }
  97. if err = w.sync(); err != nil {
  98. return nil, err
  99. }
  100. return w, nil
  101. }
  102. // Open opens the WAL at the given index.
  103. // The index SHOULD have been previously committed to the WAL, or the following
  104. // ReadAll will fail.
  105. // The returned WAL is ready to read and the first record will be the given
  106. // index. The WAL cannot be appended to before reading out all of its
  107. // previous records.
  108. func Open(dirpath string, index uint64) (*WAL, error) {
  109. return openAtIndex(dirpath, index, true)
  110. }
  111. // OpenNotInUse only opens the wal files that are not in use.
  112. // Other than that, it is similar to Open.
  113. func OpenNotInUse(dirpath string, index uint64) (*WAL, error) {
  114. return openAtIndex(dirpath, index, false)
  115. }
  116. func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
  117. names, err := fileutil.ReadDir(dirpath)
  118. if err != nil {
  119. return nil, err
  120. }
  121. names = checkWalNames(names)
  122. if len(names) == 0 {
  123. return nil, ErrFileNotFound
  124. }
  125. sort.Sort(sort.StringSlice(names))
  126. nameIndex, ok := searchIndex(names, index)
  127. if !ok || !isValidSeq(names[nameIndex:]) {
  128. return nil, ErrFileNotFound
  129. }
  130. // open the wal files for reading
  131. rcs := make([]io.ReadCloser, 0)
  132. ls := make([]fileutil.Lock, 0)
  133. for _, name := range names[nameIndex:] {
  134. f, err := os.Open(path.Join(dirpath, name))
  135. if err != nil {
  136. return nil, err
  137. }
  138. l, err := fileutil.NewLock(f.Name())
  139. if err != nil {
  140. return nil, err
  141. }
  142. err = l.TryLock()
  143. if err != nil {
  144. if all {
  145. return nil, err
  146. } else {
  147. log.Printf("wal: opened all the files until %s, since it is still in use by an etcd server", name)
  148. break
  149. }
  150. }
  151. rcs = append(rcs, f)
  152. ls = append(ls, l)
  153. }
  154. rc := MultiReadCloser(rcs...)
  155. // open the lastest wal file for appending
  156. seq, _, err := parseWalName(names[len(names)-1])
  157. if err != nil {
  158. rc.Close()
  159. return nil, err
  160. }
  161. last := path.Join(dirpath, names[len(names)-1])
  162. f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
  163. if err != nil {
  164. rc.Close()
  165. return nil, err
  166. }
  167. // create a WAL ready for reading
  168. w := &WAL{
  169. dir: dirpath,
  170. ri: index,
  171. decoder: newDecoder(rc),
  172. f: f,
  173. seq: seq,
  174. locks: ls,
  175. }
  176. return w, nil
  177. }
  178. // ReadAll reads out all records of the current WAL.
  179. // If it cannot read out the expected entry, it will return ErrIndexNotFound.
  180. // After ReadAll, the WAL will be ready for appending new records.
  181. func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
  182. rec := &walpb.Record{}
  183. decoder := w.decoder
  184. for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
  185. switch rec.Type {
  186. case entryType:
  187. e := mustUnmarshalEntry(rec.Data)
  188. if e.Index >= w.ri {
  189. ents = append(ents[:e.Index-w.ri], e)
  190. }
  191. w.enti = e.Index
  192. case stateType:
  193. state = mustUnmarshalState(rec.Data)
  194. case metadataType:
  195. if metadata != nil && !reflect.DeepEqual(metadata, rec.Data) {
  196. state.Reset()
  197. return nil, state, nil, ErrMetadataConflict
  198. }
  199. metadata = rec.Data
  200. case crcType:
  201. crc := decoder.crc.Sum32()
  202. // current crc of decoder must match the crc of the record.
  203. // do no need to match 0 crc, since the decoder is a new one at this case.
  204. if crc != 0 && rec.Validate(crc) != nil {
  205. state.Reset()
  206. return nil, state, nil, ErrCRCMismatch
  207. }
  208. decoder.updateCRC(rec.Crc)
  209. default:
  210. state.Reset()
  211. return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
  212. }
  213. }
  214. if err != io.EOF {
  215. state.Reset()
  216. return nil, state, nil, err
  217. }
  218. // close decoder, disable reading
  219. w.decoder.close()
  220. w.ri = 0
  221. w.metadata = metadata
  222. // create encoder (chain crc with the decoder), enable appending
  223. w.encoder = newEncoder(w.f, w.decoder.lastCRC())
  224. w.decoder = nil
  225. return metadata, state, ents, nil
  226. }
  227. // Cut closes current file written and creates a new one ready to append.
  228. func (w *WAL) Cut() error {
  229. // create a new wal file with name sequence + 1
  230. fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
  231. f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
  232. if err != nil {
  233. return err
  234. }
  235. l, err := fileutil.NewLock(f.Name())
  236. if err != nil {
  237. return err
  238. }
  239. err = l.Lock()
  240. if err != nil {
  241. return err
  242. }
  243. w.locks = append(w.locks, l)
  244. if err = w.sync(); err != nil {
  245. return err
  246. }
  247. w.f.Close()
  248. // update writer and save the previous crc
  249. w.f = f
  250. w.seq++
  251. prevCrc := w.encoder.crc.Sum32()
  252. w.encoder = newEncoder(w.f, prevCrc)
  253. if err := w.saveCrc(prevCrc); err != nil {
  254. return err
  255. }
  256. if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
  257. return err
  258. }
  259. if err := w.SaveState(&w.state); err != nil {
  260. return err
  261. }
  262. return w.sync()
  263. }
  264. func (w *WAL) sync() error {
  265. if w.encoder != nil {
  266. if err := w.encoder.flush(); err != nil {
  267. return err
  268. }
  269. }
  270. return w.f.Sync()
  271. }
  272. // ReleaseLockTo releases the locks w is holding, which
  273. // have index smaller or equal to the given index.
  274. func (w *WAL) ReleaseLockTo(index uint64) error {
  275. for _, l := range w.locks {
  276. _, i, err := parseWalName(path.Base(l.Name()))
  277. if err != nil {
  278. return err
  279. }
  280. if i > index {
  281. return nil
  282. }
  283. err = l.Unlock()
  284. if err != nil {
  285. return err
  286. }
  287. err = l.Destroy()
  288. if err != nil {
  289. return err
  290. }
  291. w.locks = w.locks[1:]
  292. }
  293. return nil
  294. }
  295. func (w *WAL) Close() error {
  296. if w.f != nil {
  297. if err := w.sync(); err != nil {
  298. return err
  299. }
  300. if err := w.f.Close(); err != nil {
  301. return err
  302. }
  303. }
  304. for _, l := range w.locks {
  305. // TODO: log the error
  306. l.Unlock()
  307. l.Destroy()
  308. }
  309. return nil
  310. }
  311. func (w *WAL) SaveEntry(e *raftpb.Entry) error {
  312. b := pbutil.MustMarshal(e)
  313. rec := &walpb.Record{Type: entryType, Data: b}
  314. if err := w.encoder.encode(rec); err != nil {
  315. return err
  316. }
  317. w.enti = e.Index
  318. return nil
  319. }
  320. func (w *WAL) SaveState(s *raftpb.HardState) error {
  321. if raft.IsEmptyHardState(*s) {
  322. return nil
  323. }
  324. w.state = *s
  325. b := pbutil.MustMarshal(s)
  326. rec := &walpb.Record{Type: stateType, Data: b}
  327. return w.encoder.encode(rec)
  328. }
  329. func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
  330. // TODO(xiangli): no more reference operator
  331. if err := w.SaveState(&st); err != nil {
  332. return err
  333. }
  334. for i := range ents {
  335. if err := w.SaveEntry(&ents[i]); err != nil {
  336. return err
  337. }
  338. }
  339. return w.sync()
  340. }
  341. func (w *WAL) saveCrc(prevCrc uint32) error {
  342. return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
  343. }