wal.go 8.0 KB


  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wal
  14. import (
  15. "bufio"
  16. "bytes"
  17. "encoding/binary"
  18. "fmt"
  19. "io"
  20. "log"
  21. "os"
  22. "path"
  23. "sort"
  24. "github.com/coreos/etcd/raft"
  25. )
  26. const (
  27. infoType int64 = iota + 1
  28. entryType
  29. stateType
  30. )
  31. var (
  32. ErrIdMismatch = fmt.Errorf("unmatch id")
  33. ErrNotFound = fmt.Errorf("wal file is not found")
  34. )
  35. type WAL struct {
  36. f *os.File
  37. bw *bufio.Writer
  38. buf *bytes.Buffer
  39. }
  40. func newWAL(f *os.File) *WAL {
  41. return &WAL{f, bufio.NewWriter(f), new(bytes.Buffer)}
  42. }
  43. func Exist(dirpath string) bool {
  44. names, err := readDir(dirpath)
  45. if err != nil {
  46. return false
  47. }
  48. return len(names) != 0
  49. }
  50. func Create(dirpath string) (*WAL, error) {
  51. log.Printf("path=%s wal.create", dirpath)
  52. if Exist(dirpath) {
  53. return nil, os.ErrExist
  54. }
  55. p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0))
  56. f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
  57. if err != nil {
  58. return nil, err
  59. }
  60. return newWAL(f), nil
  61. }
  62. func Open(dirpath string) (*WAL, error) {
  63. log.Printf("path=%s wal.append", dirpath)
  64. names, err := readDir(dirpath)
  65. if err != nil {
  66. return nil, err
  67. }
  68. names = checkWalNames(names)
  69. if len(names) == 0 {
  70. return nil, ErrNotFound
  71. }
  72. name := names[len(names)-1]
  73. p := path.Join(dirpath, name)
  74. f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND, 0)
  75. if err != nil {
  76. return nil, err
  77. }
  78. return newWAL(f), nil
  79. }
  80. // index should be the index of last log entry currently.
  81. // Cut closes current file written and creates a new one to append.
  82. func (w *WAL) Cut(index int64) error {
  83. log.Printf("path=%s wal.cut index=%d", w.f.Name(), index)
  84. fpath := w.f.Name()
  85. seq, _, err := parseWalName(path.Base(fpath))
  86. if err != nil {
  87. panic("parse correct name error")
  88. }
  89. fpath = path.Join(path.Dir(fpath), fmt.Sprintf("%016x-%016x.wal", seq+1, index))
  90. f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
  91. if err != nil {
  92. return err
  93. }
  94. w.Sync()
  95. w.f.Close()
  96. w.f = f
  97. w.bw = bufio.NewWriter(f)
  98. return nil
  99. }
  100. func (w *WAL) Sync() error {
  101. if err := w.bw.Flush(); err != nil {
  102. return err
  103. }
  104. return w.f.Sync()
  105. }
  106. func (w *WAL) Close() {
  107. log.Printf("path=%s wal.close", w.f.Name())
  108. if w.f != nil {
  109. w.Sync()
  110. w.f.Close()
  111. }
  112. }
  113. func (w *WAL) SaveInfo(i *raft.Info) error {
  114. log.Printf("path=%s wal.saveInfo id=%d", w.f.Name(), i.Id)
  115. if err := w.checkAtHead(); err != nil {
  116. return err
  117. }
  118. b, err := i.Marshal()
  119. if err != nil {
  120. panic(err)
  121. }
  122. rec := &Record{Type: infoType, Data: b}
  123. return writeRecord(w.bw, rec)
  124. }
  125. func (w *WAL) SaveEntry(e *raft.Entry) error {
  126. b, err := e.Marshal()
  127. if err != nil {
  128. panic(err)
  129. }
  130. rec := &Record{Type: entryType, Data: b}
  131. return writeRecord(w.bw, rec)
  132. }
  133. func (w *WAL) SaveState(s *raft.State) error {
  134. log.Printf("path=%s wal.saveState state=\"%+v\"", w.f.Name(), s)
  135. b, err := s.Marshal()
  136. if err != nil {
  137. panic(err)
  138. }
  139. rec := &Record{Type: stateType, Data: b}
  140. return writeRecord(w.bw, rec)
  141. }
  142. func (w *WAL) checkAtHead() error {
  143. o, err := w.f.Seek(0, os.SEEK_CUR)
  144. if err != nil {
  145. return err
  146. }
  147. if o != 0 || w.bw.Buffered() != 0 {
  148. return fmt.Errorf("cannot write info at %d, expect 0", max(o, int64(w.bw.Buffered())))
  149. }
  150. return nil
  151. }
  152. type Node struct {
  153. Id int64
  154. Ents []raft.Entry
  155. State raft.State
  156. // index of the first entry
  157. index int64
  158. }
  159. func newNode(index int64) *Node {
  160. return &Node{Ents: make([]raft.Entry, 0), index: index + 1}
  161. }
  162. func (n *Node) load(path string) error {
  163. f, err := os.Open(path)
  164. if err != nil {
  165. return err
  166. }
  167. defer f.Close()
  168. br := bufio.NewReader(f)
  169. rec := &Record{}
  170. err = readRecord(br, rec)
  171. if err != nil {
  172. return err
  173. }
  174. if rec.Type != infoType {
  175. return fmt.Errorf("the first block of wal is not infoType but %d", rec.Type)
  176. }
  177. i, err := loadInfo(rec.Data)
  178. if err != nil {
  179. return err
  180. }
  181. if n.Id != 0 && n.Id != i.Id {
  182. return ErrIdMismatch
  183. }
  184. n.Id = i.Id
  185. for err = readRecord(br, rec); err == nil; err = readRecord(br, rec) {
  186. switch rec.Type {
  187. case entryType:
  188. e, err := loadEntry(rec.Data)
  189. if err != nil {
  190. return err
  191. }
  192. if e.Index >= n.index {
  193. n.Ents = append(n.Ents[:e.Index-n.index], e)
  194. }
  195. case stateType:
  196. s, err := loadState(rec.Data)
  197. if err != nil {
  198. return err
  199. }
  200. n.State = s
  201. default:
  202. return fmt.Errorf("unexpected block type %d", rec.Type)
  203. }
  204. }
  205. if err != io.EOF {
  206. return err
  207. }
  208. return nil
  209. }
  210. func (n *Node) startFrom(index int64) error {
  211. diff := int(index - n.index)
  212. if diff > len(n.Ents) {
  213. return ErrNotFound
  214. }
  215. n.Ents = n.Ents[diff:]
  216. return nil
  217. }
  218. // Read loads all entries after index (index is not included).
  219. func Read(dirpath string, index int64) (*Node, error) {
  220. log.Printf("path=%s wal.load index=%d", dirpath, index)
  221. names, err := readDir(dirpath)
  222. if err != nil {
  223. return nil, err
  224. }
  225. names = checkWalNames(names)
  226. if len(names) == 0 {
  227. return nil, ErrNotFound
  228. }
  229. sort.Sort(sort.StringSlice(names))
  230. nameIndex, ok := searchIndex(names, index)
  231. if !ok || !isValidSeq(names[nameIndex:]) {
  232. return nil, ErrNotFound
  233. }
  234. _, initIndex, err := parseWalName(names[nameIndex])
  235. if err != nil {
  236. panic("parse correct name error")
  237. }
  238. n := newNode(initIndex)
  239. for _, name := range names[nameIndex:] {
  240. if err := n.load(path.Join(dirpath, name)); err != nil {
  241. return nil, err
  242. }
  243. }
  244. if err := n.startFrom(index + 1); err != nil {
  245. return nil, ErrNotFound
  246. }
  247. return n, nil
  248. }
  249. // The input names should be sorted.
  250. // serachIndex returns the array index of the last name that has
  251. // a smaller raft index section than the given raft index.
  252. func searchIndex(names []string, index int64) (int, bool) {
  253. for i := len(names) - 1; i >= 0; i-- {
  254. name := names[i]
  255. _, curIndex, err := parseWalName(name)
  256. if err != nil {
  257. panic("parse correct name error")
  258. }
  259. if index >= curIndex {
  260. return i, true
  261. }
  262. }
  263. return -1, false
  264. }
  265. // names should have been sorted based on sequence number.
  266. // isValidSeq checks whether seq increases continuously.
  267. func isValidSeq(names []string) bool {
  268. var lastSeq int64
  269. for _, name := range names {
  270. curSeq, _, err := parseWalName(name)
  271. if err != nil {
  272. panic("parse correct name error")
  273. }
  274. if lastSeq != 0 && lastSeq != curSeq-1 {
  275. return false
  276. }
  277. lastSeq = curSeq
  278. }
  279. return true
  280. }
  281. func loadInfo(d []byte) (raft.Info, error) {
  282. var i raft.Info
  283. err := i.Unmarshal(d)
  284. if err != nil {
  285. panic(err)
  286. }
  287. return i, err
  288. }
  289. func loadEntry(d []byte) (raft.Entry, error) {
  290. var e raft.Entry
  291. err := e.Unmarshal(d)
  292. if err != nil {
  293. panic(err)
  294. }
  295. return e, err
  296. }
  297. func loadState(d []byte) (raft.State, error) {
  298. var s raft.State
  299. err := s.Unmarshal(d)
  300. return s, err
  301. }
  302. // readDir returns the filenames in wal directory.
  303. func readDir(dirpath string) ([]string, error) {
  304. dir, err := os.Open(dirpath)
  305. if err != nil {
  306. return nil, err
  307. }
  308. defer dir.Close()
  309. names, err := dir.Readdirnames(-1)
  310. if err != nil {
  311. return nil, err
  312. }
  313. return names, nil
  314. }
  315. func checkWalNames(names []string) []string {
  316. wnames := make([]string, 0)
  317. for _, name := range names {
  318. if _, _, err := parseWalName(name); err != nil {
  319. log.Printf("parse %s: %v", name, err)
  320. continue
  321. }
  322. wnames = append(wnames, name)
  323. }
  324. return wnames
  325. }
  326. func parseWalName(str string) (seq, index int64, err error) {
  327. var num int
  328. num, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
  329. if num != 2 && err == nil {
  330. err = fmt.Errorf("bad wal name: %s", str)
  331. }
  332. return
  333. }
  334. func writeInt64(w io.Writer, n int64) error {
  335. return binary.Write(w, binary.LittleEndian, n)
  336. }
  337. func readInt64(r io.Reader) (int64, error) {
  338. var n int64
  339. err := binary.Read(r, binary.LittleEndian, &n)
  340. return n, err
  341. }
  342. func max(a, b int64) int64 {
  343. if a > b {
  344. return a
  345. }
  346. return b
  347. }