wal.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wal
  14. import (
  15. "bufio"
  16. "bytes"
  17. "fmt"
  18. "io"
  19. "log"
  20. "os"
  21. "path"
  22. "sort"
  23. "github.com/coreos/etcd/raft"
  24. )
  25. const (
  26. infoType int64 = iota + 1
  27. entryType
  28. stateType
  29. )
  30. var (
  31. ErrIdMismatch = fmt.Errorf("unmatch id")
  32. ErrNotFound = fmt.Errorf("wal file is not found")
  33. )
  34. type WAL struct {
  35. f *os.File
  36. bw *bufio.Writer
  37. buf *bytes.Buffer
  38. }
  39. func newWAL(f *os.File) *WAL {
  40. return &WAL{f, bufio.NewWriter(f), new(bytes.Buffer)}
  41. }
  42. func Exist(dirpath string) bool {
  43. names, err := readDir(dirpath)
  44. if err != nil {
  45. return false
  46. }
  47. return len(names) != 0
  48. }
  49. func Create(dirpath string) (*WAL, error) {
  50. log.Printf("path=%s wal.create", dirpath)
  51. if Exist(dirpath) {
  52. return nil, os.ErrExist
  53. }
  54. p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0))
  55. f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
  56. if err != nil {
  57. return nil, err
  58. }
  59. return newWAL(f), nil
  60. }
  61. func Open(dirpath string) (*WAL, error) {
  62. log.Printf("path=%s wal.append", dirpath)
  63. names, err := readDir(dirpath)
  64. if err != nil {
  65. return nil, err
  66. }
  67. names = checkWalNames(names)
  68. if len(names) == 0 {
  69. return nil, ErrNotFound
  70. }
  71. name := names[len(names)-1]
  72. p := path.Join(dirpath, name)
  73. f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND, 0)
  74. if err != nil {
  75. return nil, err
  76. }
  77. return newWAL(f), nil
  78. }
  79. // index should be the index of last log entry currently.
  80. // Cut closes current file written and creates a new one to append.
  81. func (w *WAL) Cut(index int64) error {
  82. log.Printf("path=%s wal.cut index=%d", w.f.Name(), index)
  83. fpath := w.f.Name()
  84. seq, _, err := parseWalName(path.Base(fpath))
  85. if err != nil {
  86. panic("parse correct name error")
  87. }
  88. fpath = path.Join(path.Dir(fpath), fmt.Sprintf("%016x-%016x.wal", seq+1, index))
  89. f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
  90. if err != nil {
  91. return err
  92. }
  93. w.Sync()
  94. w.f.Close()
  95. w.f = f
  96. w.bw = bufio.NewWriter(f)
  97. return nil
  98. }
  99. func (w *WAL) Sync() error {
  100. if err := w.bw.Flush(); err != nil {
  101. return err
  102. }
  103. return w.f.Sync()
  104. }
  105. func (w *WAL) Close() {
  106. log.Printf("path=%s wal.close", w.f.Name())
  107. if w.f != nil {
  108. w.Sync()
  109. w.f.Close()
  110. }
  111. }
  112. func (w *WAL) SaveInfo(i *raft.Info) error {
  113. log.Printf("path=%s wal.saveInfo id=%d", w.f.Name(), i.Id)
  114. if err := w.checkAtHead(); err != nil {
  115. return err
  116. }
  117. b, err := i.Marshal()
  118. if err != nil {
  119. panic(err)
  120. }
  121. rec := &Record{Type: infoType, Data: b}
  122. return writeRecord(w.bw, rec)
  123. }
  124. func (w *WAL) SaveEntry(e *raft.Entry) error {
  125. b, err := e.Marshal()
  126. if err != nil {
  127. panic(err)
  128. }
  129. rec := &Record{Type: entryType, Data: b}
  130. return writeRecord(w.bw, rec)
  131. }
  132. func (w *WAL) SaveState(s *raft.State) error {
  133. log.Printf("path=%s wal.saveState state=\"%+v\"", w.f.Name(), s)
  134. b, err := s.Marshal()
  135. if err != nil {
  136. panic(err)
  137. }
  138. rec := &Record{Type: stateType, Data: b}
  139. return writeRecord(w.bw, rec)
  140. }
  141. func (w *WAL) checkAtHead() error {
  142. o, err := w.f.Seek(0, os.SEEK_CUR)
  143. if err != nil {
  144. return err
  145. }
  146. if o != 0 || w.bw.Buffered() != 0 {
  147. return fmt.Errorf("cannot write info at %d, expect 0", max(o, int64(w.bw.Buffered())))
  148. }
  149. return nil
  150. }
  151. type Node struct {
  152. Id int64
  153. Ents []raft.Entry
  154. State raft.State
  155. // index of the first entry
  156. index int64
  157. }
  158. func newNode(index int64) *Node {
  159. return &Node{Ents: make([]raft.Entry, 0), index: index + 1}
  160. }
  161. func (n *Node) load(path string) error {
  162. f, err := os.Open(path)
  163. if err != nil {
  164. return err
  165. }
  166. defer f.Close()
  167. br := bufio.NewReader(f)
  168. rec := &Record{}
  169. err = readRecord(br, rec)
  170. if err != nil {
  171. return err
  172. }
  173. if rec.Type != infoType {
  174. return fmt.Errorf("the first block of wal is not infoType but %d", rec.Type)
  175. }
  176. i, err := loadInfo(rec.Data)
  177. if err != nil {
  178. return err
  179. }
  180. if n.Id != 0 && n.Id != i.Id {
  181. return ErrIdMismatch
  182. }
  183. n.Id = i.Id
  184. for err = readRecord(br, rec); err == nil; err = readRecord(br, rec) {
  185. switch rec.Type {
  186. case entryType:
  187. e, err := loadEntry(rec.Data)
  188. if err != nil {
  189. return err
  190. }
  191. if e.Index >= n.index {
  192. n.Ents = append(n.Ents[:e.Index-n.index], e)
  193. }
  194. case stateType:
  195. s, err := loadState(rec.Data)
  196. if err != nil {
  197. return err
  198. }
  199. n.State = s
  200. default:
  201. return fmt.Errorf("unexpected block type %d", rec.Type)
  202. }
  203. }
  204. if err != io.EOF {
  205. return err
  206. }
  207. return nil
  208. }
  209. func (n *Node) startFrom(index int64) error {
  210. diff := int(index - n.index)
  211. if diff > len(n.Ents) {
  212. return ErrNotFound
  213. }
  214. n.Ents = n.Ents[diff:]
  215. return nil
  216. }
  217. // Read loads all entries after index (index is not included).
  218. func Read(dirpath string, index int64) (*Node, error) {
  219. log.Printf("path=%s wal.load index=%d", dirpath, index)
  220. names, err := readDir(dirpath)
  221. if err != nil {
  222. return nil, err
  223. }
  224. names = checkWalNames(names)
  225. if len(names) == 0 {
  226. return nil, ErrNotFound
  227. }
  228. sort.Sort(sort.StringSlice(names))
  229. nameIndex, ok := searchIndex(names, index)
  230. if !ok || !isValidSeq(names[nameIndex:]) {
  231. return nil, ErrNotFound
  232. }
  233. _, initIndex, err := parseWalName(names[nameIndex])
  234. if err != nil {
  235. panic("parse correct name error")
  236. }
  237. n := newNode(initIndex)
  238. for _, name := range names[nameIndex:] {
  239. if err := n.load(path.Join(dirpath, name)); err != nil {
  240. return nil, err
  241. }
  242. }
  243. if err := n.startFrom(index + 1); err != nil {
  244. return nil, ErrNotFound
  245. }
  246. return n, nil
  247. }
  248. // The input names should be sorted.
  249. // serachIndex returns the array index of the last name that has
  250. // a smaller raft index section than the given raft index.
  251. func searchIndex(names []string, index int64) (int, bool) {
  252. for i := len(names) - 1; i >= 0; i-- {
  253. name := names[i]
  254. _, curIndex, err := parseWalName(name)
  255. if err != nil {
  256. panic("parse correct name error")
  257. }
  258. if index >= curIndex {
  259. return i, true
  260. }
  261. }
  262. return -1, false
  263. }
  264. // names should have been sorted based on sequence number.
  265. // isValidSeq checks whether seq increases continuously.
  266. func isValidSeq(names []string) bool {
  267. var lastSeq int64
  268. for _, name := range names {
  269. curSeq, _, err := parseWalName(name)
  270. if err != nil {
  271. panic("parse correct name error")
  272. }
  273. if lastSeq != 0 && lastSeq != curSeq-1 {
  274. return false
  275. }
  276. lastSeq = curSeq
  277. }
  278. return true
  279. }
  280. func loadInfo(d []byte) (raft.Info, error) {
  281. var i raft.Info
  282. err := i.Unmarshal(d)
  283. if err != nil {
  284. panic(err)
  285. }
  286. return i, err
  287. }
  288. func loadEntry(d []byte) (raft.Entry, error) {
  289. var e raft.Entry
  290. err := e.Unmarshal(d)
  291. if err != nil {
  292. panic(err)
  293. }
  294. return e, err
  295. }
  296. func loadState(d []byte) (raft.State, error) {
  297. var s raft.State
  298. err := s.Unmarshal(d)
  299. return s, err
  300. }
  301. // readDir returns the filenames in wal directory.
  302. func readDir(dirpath string) ([]string, error) {
  303. dir, err := os.Open(dirpath)
  304. if err != nil {
  305. return nil, err
  306. }
  307. defer dir.Close()
  308. names, err := dir.Readdirnames(-1)
  309. if err != nil {
  310. return nil, err
  311. }
  312. return names, nil
  313. }
  314. func checkWalNames(names []string) []string {
  315. wnames := make([]string, 0)
  316. for _, name := range names {
  317. if _, _, err := parseWalName(name); err != nil {
  318. log.Printf("parse %s: %v", name, err)
  319. continue
  320. }
  321. wnames = append(wnames, name)
  322. }
  323. return wnames
  324. }
  325. func parseWalName(str string) (seq, index int64, err error) {
  326. var num int
  327. num, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
  328. if num != 2 && err == nil {
  329. err = fmt.Errorf("bad wal name: %s", str)
  330. }
  331. return
  332. }
  333. func max(a, b int64) int64 {
  334. if a > b {
  335. return a
  336. }
  337. return b
  338. }