wal_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package wal
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "math"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "reflect"
  25. "regexp"
  26. "testing"
  27. "go.etcd.io/etcd/pkg/fileutil"
  28. "go.etcd.io/etcd/pkg/pbutil"
  29. "go.etcd.io/etcd/raft/raftpb"
  30. "go.etcd.io/etcd/wal/walpb"
  31. "go.uber.org/zap"
  32. )
  33. func TestNew(t *testing.T) {
  34. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. defer os.RemoveAll(p)
  39. w, err := Create(zap.NewExample(), p, []byte("somedata"))
  40. if err != nil {
  41. t.Fatalf("err = %v, want nil", err)
  42. }
  43. if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
  44. t.Errorf("name = %+v, want %+v", g, walName(0, 0))
  45. }
  46. defer w.Close()
  47. // file is preallocated to segment size; only read data written by wal
  48. off, err := w.tail().Seek(0, io.SeekCurrent)
  49. if err != nil {
  50. t.Fatal(err)
  51. }
  52. gd := make([]byte, off)
  53. f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name())))
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. defer f.Close()
  58. if _, err = io.ReadFull(f, gd); err != nil {
  59. t.Fatalf("err = %v, want nil", err)
  60. }
  61. var wb bytes.Buffer
  62. e := newEncoder(&wb, 0, 0)
  63. err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
  64. if err != nil {
  65. t.Fatalf("err = %v, want nil", err)
  66. }
  67. err = e.encode(&walpb.Record{Type: metadataType, Data: []byte("somedata")})
  68. if err != nil {
  69. t.Fatalf("err = %v, want nil", err)
  70. }
  71. r := &walpb.Record{
  72. Type: snapshotType,
  73. Data: pbutil.MustMarshal(&walpb.Snapshot{}),
  74. }
  75. if err = e.encode(r); err != nil {
  76. t.Fatalf("err = %v, want nil", err)
  77. }
  78. e.flush()
  79. if !bytes.Equal(gd, wb.Bytes()) {
  80. t.Errorf("data = %v, want %v", gd, wb.Bytes())
  81. }
  82. }
  83. func TestCreateFailFromPollutedDir(t *testing.T) {
  84. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  85. if err != nil {
  86. t.Fatal(err)
  87. }
  88. defer os.RemoveAll(p)
  89. ioutil.WriteFile(filepath.Join(p, "test.wal"), []byte("data"), os.ModeTemporary)
  90. _, err = Create(zap.NewExample(), p, []byte("data"))
  91. if err != os.ErrExist {
  92. t.Fatalf("expected %v, got %v", os.ErrExist, err)
  93. }
  94. }
  95. func TestWalCleanup(t *testing.T) {
  96. testRoot, err := ioutil.TempDir(os.TempDir(), "waltestroot")
  97. if err != nil {
  98. t.Fatal(err)
  99. }
  100. p, err := ioutil.TempDir(testRoot, "waltest")
  101. if err != nil {
  102. t.Fatal(err)
  103. }
  104. defer os.RemoveAll(testRoot)
  105. logger := zap.NewExample()
  106. w, err := Create(logger, p, []byte(""))
  107. if err != nil {
  108. t.Fatalf("err = %v, want nil", err)
  109. }
  110. w.cleanupWAL(logger)
  111. fnames, err := fileutil.ReadDir(testRoot)
  112. if err != nil {
  113. t.Fatalf("err = %v, want nil", err)
  114. }
  115. if len(fnames) != 1 {
  116. t.Fatalf("expected 1 file under %v, got %v", testRoot, len(fnames))
  117. }
  118. pattern := fmt.Sprintf(`%s.broken\.[\d]{8}\.[\d]{6}\.[\d]{1,6}?`, filepath.Base(p))
  119. match, _ := regexp.MatchString(pattern, fnames[0])
  120. if !match {
  121. t.Errorf("match = false, expected true for %v with pattern %v", fnames[0], pattern)
  122. }
  123. }
  124. func TestCreateFailFromNoSpaceLeft(t *testing.T) {
  125. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. defer os.RemoveAll(p)
  130. oldSegmentSizeBytes := SegmentSizeBytes
  131. defer func() {
  132. SegmentSizeBytes = oldSegmentSizeBytes
  133. }()
  134. SegmentSizeBytes = math.MaxInt64
  135. _, err = Create(zap.NewExample(), p, []byte("data"))
  136. if err == nil { // no space left on device
  137. t.Fatalf("expected error 'no space left on device', got nil")
  138. }
  139. }
  140. func TestNewForInitedDir(t *testing.T) {
  141. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. defer os.RemoveAll(p)
  146. os.Create(filepath.Join(p, walName(0, 0)))
  147. if _, err = Create(zap.NewExample(), p, nil); err == nil || err != os.ErrExist {
  148. t.Errorf("err = %v, want %v", err, os.ErrExist)
  149. }
  150. }
  151. func TestOpenAtIndex(t *testing.T) {
  152. dir, err := ioutil.TempDir(os.TempDir(), "waltest")
  153. if err != nil {
  154. t.Fatal(err)
  155. }
  156. defer os.RemoveAll(dir)
  157. f, err := os.Create(filepath.Join(dir, walName(0, 0)))
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. f.Close()
  162. w, err := Open(zap.NewExample(), dir, walpb.Snapshot{})
  163. if err != nil {
  164. t.Fatalf("err = %v, want nil", err)
  165. }
  166. if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
  167. t.Errorf("name = %+v, want %+v", g, walName(0, 0))
  168. }
  169. if w.seq() != 0 {
  170. t.Errorf("seq = %d, want %d", w.seq(), 0)
  171. }
  172. w.Close()
  173. wname := walName(2, 10)
  174. f, err = os.Create(filepath.Join(dir, wname))
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. f.Close()
  179. w, err = Open(zap.NewExample(), dir, walpb.Snapshot{Index: 5})
  180. if err != nil {
  181. t.Fatalf("err = %v, want nil", err)
  182. }
  183. if g := filepath.Base(w.tail().Name()); g != wname {
  184. t.Errorf("name = %+v, want %+v", g, wname)
  185. }
  186. if w.seq() != 2 {
  187. t.Errorf("seq = %d, want %d", w.seq(), 2)
  188. }
  189. w.Close()
  190. emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty")
  191. if err != nil {
  192. t.Fatal(err)
  193. }
  194. defer os.RemoveAll(emptydir)
  195. if _, err = Open(zap.NewExample(), emptydir, walpb.Snapshot{}); err != ErrFileNotFound {
  196. t.Errorf("err = %v, want %v", err, ErrFileNotFound)
  197. }
  198. }
  199. // TestVerify tests that Verify throws a non-nil error when the WAL is corrupted.
  200. // The test creates a WAL directory and cuts out multiple WAL files. Then
  201. // it corrupts one of the files by completely truncating it.
  202. func TestVerify(t *testing.T) {
  203. walDir, err := ioutil.TempDir(os.TempDir(), "waltest")
  204. if err != nil {
  205. t.Fatal(err)
  206. }
  207. defer os.RemoveAll(walDir)
  208. // create WAL
  209. w, err := Create(zap.NewExample(), walDir, nil)
  210. if err != nil {
  211. t.Fatal(err)
  212. }
  213. defer w.Close()
  214. // make 5 separate files
  215. for i := 0; i < 5; i++ {
  216. es := []raftpb.Entry{{Index: uint64(i), Data: []byte("waldata" + string(i+1))}}
  217. if err = w.Save(raftpb.HardState{}, es); err != nil {
  218. t.Fatal(err)
  219. }
  220. if err = w.cut(); err != nil {
  221. t.Fatal(err)
  222. }
  223. }
  224. // to verify the WAL is not corrupted at this point
  225. err = Verify(zap.NewExample(), walDir, walpb.Snapshot{})
  226. if err != nil {
  227. t.Errorf("expected a nil error, got %v", err)
  228. }
  229. walFiles, err := ioutil.ReadDir(walDir)
  230. if err != nil {
  231. t.Fatal(err)
  232. }
  233. // corrupt the WAL by truncating one of the WAL files completely
  234. err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0)
  235. if err != nil {
  236. t.Fatal(err)
  237. }
  238. err = Verify(zap.NewExample(), walDir, walpb.Snapshot{})
  239. if err == nil {
  240. t.Error("expected a non-nil error, got nil")
  241. }
  242. }
  243. // TODO: split it into smaller tests for better readability
  244. func TestCut(t *testing.T) {
  245. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. defer os.RemoveAll(p)
  250. w, err := Create(zap.NewExample(), p, nil)
  251. if err != nil {
  252. t.Fatal(err)
  253. }
  254. defer w.Close()
  255. state := raftpb.HardState{Term: 1}
  256. if err = w.Save(state, nil); err != nil {
  257. t.Fatal(err)
  258. }
  259. if err = w.cut(); err != nil {
  260. t.Fatal(err)
  261. }
  262. wname := walName(1, 1)
  263. if g := filepath.Base(w.tail().Name()); g != wname {
  264. t.Errorf("name = %s, want %s", g, wname)
  265. }
  266. es := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}}
  267. if err = w.Save(raftpb.HardState{}, es); err != nil {
  268. t.Fatal(err)
  269. }
  270. if err = w.cut(); err != nil {
  271. t.Fatal(err)
  272. }
  273. snap := walpb.Snapshot{Index: 2, Term: 1}
  274. if err = w.SaveSnapshot(snap); err != nil {
  275. t.Fatal(err)
  276. }
  277. wname = walName(2, 2)
  278. if g := filepath.Base(w.tail().Name()); g != wname {
  279. t.Errorf("name = %s, want %s", g, wname)
  280. }
  281. // check the state in the last WAL
  282. // We do check before closing the WAL to ensure that Cut syncs the data
  283. // into the disk.
  284. f, err := os.Open(filepath.Join(p, wname))
  285. if err != nil {
  286. t.Fatal(err)
  287. }
  288. defer f.Close()
  289. nw := &WAL{
  290. decoder: newDecoder(f),
  291. start: snap,
  292. }
  293. _, gst, _, err := nw.ReadAll()
  294. if err != nil {
  295. t.Fatal(err)
  296. }
  297. if !reflect.DeepEqual(gst, state) {
  298. t.Errorf("state = %+v, want %+v", gst, state)
  299. }
  300. }
  301. func TestSaveWithCut(t *testing.T) {
  302. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  303. if err != nil {
  304. t.Fatal(err)
  305. }
  306. defer os.RemoveAll(p)
  307. w, err := Create(zap.NewExample(), p, []byte("metadata"))
  308. if err != nil {
  309. t.Fatal(err)
  310. }
  311. state := raftpb.HardState{Term: 1}
  312. if err = w.Save(state, nil); err != nil {
  313. t.Fatal(err)
  314. }
  315. bigData := make([]byte, 500)
  316. strdata := "Hello World!!"
  317. copy(bigData, strdata)
  318. // set a lower value for SegmentSizeBytes, else the test takes too long to complete
  319. restoreLater := SegmentSizeBytes
  320. const EntrySize int = 500
  321. SegmentSizeBytes = 2 * 1024
  322. defer func() { SegmentSizeBytes = restoreLater }()
  323. index := uint64(0)
  324. for totalSize := 0; totalSize < int(SegmentSizeBytes); totalSize += EntrySize {
  325. ents := []raftpb.Entry{{Index: index, Term: 1, Data: bigData}}
  326. if err = w.Save(state, ents); err != nil {
  327. t.Fatal(err)
  328. }
  329. index++
  330. }
  331. w.Close()
  332. neww, err := Open(zap.NewExample(), p, walpb.Snapshot{})
  333. if err != nil {
  334. t.Fatalf("err = %v, want nil", err)
  335. }
  336. defer neww.Close()
  337. wname := walName(1, index)
  338. if g := filepath.Base(neww.tail().Name()); g != wname {
  339. t.Errorf("name = %s, want %s", g, wname)
  340. }
  341. _, newhardstate, entries, err := neww.ReadAll()
  342. if err != nil {
  343. t.Fatal(err)
  344. }
  345. if !reflect.DeepEqual(newhardstate, state) {
  346. t.Errorf("Hard State = %+v, want %+v", newhardstate, state)
  347. }
  348. if len(entries) != int(SegmentSizeBytes/int64(EntrySize)) {
  349. t.Errorf("Number of entries = %d, expected = %d", len(entries), int(SegmentSizeBytes/int64(EntrySize)))
  350. }
  351. for _, oneent := range entries {
  352. if !bytes.Equal(oneent.Data, bigData) {
  353. t.Errorf("the saved data does not match at Index %d : found: %s , want :%s", oneent.Index, oneent.Data, bigData)
  354. }
  355. }
  356. }
  357. func TestRecover(t *testing.T) {
  358. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. defer os.RemoveAll(p)
  363. w, err := Create(zap.NewExample(), p, []byte("metadata"))
  364. if err != nil {
  365. t.Fatal(err)
  366. }
  367. if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
  368. t.Fatal(err)
  369. }
  370. ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
  371. if err = w.Save(raftpb.HardState{}, ents); err != nil {
  372. t.Fatal(err)
  373. }
  374. sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}}
  375. for _, s := range sts {
  376. if err = w.Save(s, nil); err != nil {
  377. t.Fatal(err)
  378. }
  379. }
  380. w.Close()
  381. if w, err = Open(zap.NewExample(), p, walpb.Snapshot{}); err != nil {
  382. t.Fatal(err)
  383. }
  384. metadata, state, entries, err := w.ReadAll()
  385. if err != nil {
  386. t.Fatal(err)
  387. }
  388. if !bytes.Equal(metadata, []byte("metadata")) {
  389. t.Errorf("metadata = %s, want %s", metadata, "metadata")
  390. }
  391. if !reflect.DeepEqual(entries, ents) {
  392. t.Errorf("ents = %+v, want %+v", entries, ents)
  393. }
  394. // only the latest state is recorded
  395. s := sts[len(sts)-1]
  396. if !reflect.DeepEqual(state, s) {
  397. t.Errorf("state = %+v, want %+v", state, s)
  398. }
  399. w.Close()
  400. }
  401. func TestSearchIndex(t *testing.T) {
  402. tests := []struct {
  403. names []string
  404. index uint64
  405. widx int
  406. wok bool
  407. }{
  408. {
  409. []string{
  410. "0000000000000000-0000000000000000.wal",
  411. "0000000000000001-0000000000001000.wal",
  412. "0000000000000002-0000000000002000.wal",
  413. },
  414. 0x1000, 1, true,
  415. },
  416. {
  417. []string{
  418. "0000000000000001-0000000000004000.wal",
  419. "0000000000000002-0000000000003000.wal",
  420. "0000000000000003-0000000000005000.wal",
  421. },
  422. 0x4000, 1, true,
  423. },
  424. {
  425. []string{
  426. "0000000000000001-0000000000002000.wal",
  427. "0000000000000002-0000000000003000.wal",
  428. "0000000000000003-0000000000005000.wal",
  429. },
  430. 0x1000, -1, false,
  431. },
  432. }
  433. for i, tt := range tests {
  434. idx, ok := searchIndex(zap.NewExample(), tt.names, tt.index)
  435. if idx != tt.widx {
  436. t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx)
  437. }
  438. if ok != tt.wok {
  439. t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
  440. }
  441. }
  442. }
  443. func TestScanWalName(t *testing.T) {
  444. tests := []struct {
  445. str string
  446. wseq, windex uint64
  447. wok bool
  448. }{
  449. {"0000000000000000-0000000000000000.wal", 0, 0, true},
  450. {"0000000000000000.wal", 0, 0, false},
  451. {"0000000000000000-0000000000000000.snap", 0, 0, false},
  452. }
  453. for i, tt := range tests {
  454. s, index, err := parseWALName(tt.str)
  455. if g := err == nil; g != tt.wok {
  456. t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok)
  457. }
  458. if s != tt.wseq {
  459. t.Errorf("#%d: seq = %d, want %d", i, s, tt.wseq)
  460. }
  461. if index != tt.windex {
  462. t.Errorf("#%d: index = %d, want %d", i, index, tt.windex)
  463. }
  464. }
  465. }
  466. func TestRecoverAfterCut(t *testing.T) {
  467. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  468. if err != nil {
  469. t.Fatal(err)
  470. }
  471. defer os.RemoveAll(p)
  472. md, err := Create(zap.NewExample(), p, []byte("metadata"))
  473. if err != nil {
  474. t.Fatal(err)
  475. }
  476. for i := 0; i < 10; i++ {
  477. if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
  478. t.Fatal(err)
  479. }
  480. es := []raftpb.Entry{{Index: uint64(i)}}
  481. if err = md.Save(raftpb.HardState{}, es); err != nil {
  482. t.Fatal(err)
  483. }
  484. if err = md.cut(); err != nil {
  485. t.Fatal(err)
  486. }
  487. }
  488. md.Close()
  489. if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil {
  490. t.Fatal(err)
  491. }
  492. for i := 0; i < 10; i++ {
  493. w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i)})
  494. if err != nil {
  495. if i <= 4 {
  496. if err != ErrFileNotFound {
  497. t.Errorf("#%d: err = %v, want %v", i, err, ErrFileNotFound)
  498. }
  499. } else {
  500. t.Errorf("#%d: err = %v, want nil", i, err)
  501. }
  502. continue
  503. }
  504. metadata, _, entries, err := w.ReadAll()
  505. if err != nil {
  506. t.Errorf("#%d: err = %v, want nil", i, err)
  507. continue
  508. }
  509. if !bytes.Equal(metadata, []byte("metadata")) {
  510. t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata")
  511. }
  512. for j, e := range entries {
  513. if e.Index != uint64(j+i+1) {
  514. t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1)
  515. }
  516. }
  517. w.Close()
  518. }
  519. }
  520. func TestOpenAtUncommittedIndex(t *testing.T) {
  521. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  522. if err != nil {
  523. t.Fatal(err)
  524. }
  525. defer os.RemoveAll(p)
  526. w, err := Create(zap.NewExample(), p, nil)
  527. if err != nil {
  528. t.Fatal(err)
  529. }
  530. if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
  531. t.Fatal(err)
  532. }
  533. if err = w.Save(raftpb.HardState{}, []raftpb.Entry{{Index: 0}}); err != nil {
  534. t.Fatal(err)
  535. }
  536. w.Close()
  537. w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
  538. if err != nil {
  539. t.Fatal(err)
  540. }
  541. // commit up to index 0, try to read index 1
  542. if _, _, _, err = w.ReadAll(); err != nil {
  543. t.Errorf("err = %v, want nil", err)
  544. }
  545. w.Close()
  546. }
  547. // TestOpenForRead tests that OpenForRead can load all files.
  548. // The tests creates WAL directory, and cut out multiple WAL files. Then
  549. // it releases the lock of part of data, and excepts that OpenForRead
  550. // can read out all files even if some are locked for write.
  551. func TestOpenForRead(t *testing.T) {
  552. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  553. if err != nil {
  554. t.Fatal(err)
  555. }
  556. defer os.RemoveAll(p)
  557. // create WAL
  558. w, err := Create(zap.NewExample(), p, nil)
  559. if err != nil {
  560. t.Fatal(err)
  561. }
  562. defer w.Close()
  563. // make 10 separate files
  564. for i := 0; i < 10; i++ {
  565. es := []raftpb.Entry{{Index: uint64(i)}}
  566. if err = w.Save(raftpb.HardState{}, es); err != nil {
  567. t.Fatal(err)
  568. }
  569. if err = w.cut(); err != nil {
  570. t.Fatal(err)
  571. }
  572. }
  573. // release the lock to 5
  574. unlockIndex := uint64(5)
  575. w.ReleaseLockTo(unlockIndex)
  576. // All are available for read
  577. w2, err := OpenForRead(zap.NewExample(), p, walpb.Snapshot{})
  578. if err != nil {
  579. t.Fatal(err)
  580. }
  581. defer w2.Close()
  582. _, _, ents, err := w2.ReadAll()
  583. if err != nil {
  584. t.Fatalf("err = %v, want nil", err)
  585. }
  586. if g := ents[len(ents)-1].Index; g != 9 {
  587. t.Errorf("last index read = %d, want %d", g, 9)
  588. }
  589. }
  590. func TestSaveEmpty(t *testing.T) {
  591. var buf bytes.Buffer
  592. var est raftpb.HardState
  593. w := WAL{
  594. encoder: newEncoder(&buf, 0, 0),
  595. }
  596. if err := w.saveState(&est); err != nil {
  597. t.Errorf("err = %v, want nil", err)
  598. }
  599. if len(buf.Bytes()) != 0 {
  600. t.Errorf("buf.Bytes = %d, want 0", len(buf.Bytes()))
  601. }
  602. }
  603. func TestReleaseLockTo(t *testing.T) {
  604. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  605. if err != nil {
  606. t.Fatal(err)
  607. }
  608. defer os.RemoveAll(p)
  609. // create WAL
  610. w, err := Create(zap.NewExample(), p, nil)
  611. defer func() {
  612. if err = w.Close(); err != nil {
  613. t.Fatal(err)
  614. }
  615. }()
  616. if err != nil {
  617. t.Fatal(err)
  618. }
  619. // release nothing if no files
  620. err = w.ReleaseLockTo(10)
  621. if err != nil {
  622. t.Errorf("err = %v, want nil", err)
  623. }
  624. // make 10 separate files
  625. for i := 0; i < 10; i++ {
  626. es := []raftpb.Entry{{Index: uint64(i)}}
  627. if err = w.Save(raftpb.HardState{}, es); err != nil {
  628. t.Fatal(err)
  629. }
  630. if err = w.cut(); err != nil {
  631. t.Fatal(err)
  632. }
  633. }
  634. // release the lock to 5
  635. unlockIndex := uint64(5)
  636. w.ReleaseLockTo(unlockIndex)
  637. // expected remaining are 4,5,6,7,8,9,10
  638. if len(w.locks) != 7 {
  639. t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 7)
  640. }
  641. for i, l := range w.locks {
  642. var lockIndex uint64
  643. _, lockIndex, err = parseWALName(filepath.Base(l.Name()))
  644. if err != nil {
  645. t.Fatal(err)
  646. }
  647. if lockIndex != uint64(i+4) {
  648. t.Errorf("#%d: lockindex = %d, want %d", i, lockIndex, uint64(i+4))
  649. }
  650. }
  651. // release the lock to 15
  652. unlockIndex = uint64(15)
  653. w.ReleaseLockTo(unlockIndex)
  654. // expected remaining is 10
  655. if len(w.locks) != 1 {
  656. t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
  657. }
  658. _, lockIndex, err := parseWALName(filepath.Base(w.locks[0].Name()))
  659. if err != nil {
  660. t.Fatal(err)
  661. }
  662. if lockIndex != uint64(10) {
  663. t.Errorf("lockindex = %d, want %d", lockIndex, 10)
  664. }
  665. }
  666. // TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space.
  667. func TestTailWriteNoSlackSpace(t *testing.T) {
  668. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  669. if err != nil {
  670. t.Fatal(err)
  671. }
  672. defer os.RemoveAll(p)
  673. // create initial WAL
  674. w, err := Create(zap.NewExample(), p, []byte("metadata"))
  675. if err != nil {
  676. t.Fatal(err)
  677. }
  678. // write some entries
  679. for i := 1; i <= 5; i++ {
  680. es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
  681. if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
  682. t.Fatal(err)
  683. }
  684. }
  685. // get rid of slack space by truncating file
  686. off, serr := w.tail().Seek(0, io.SeekCurrent)
  687. if serr != nil {
  688. t.Fatal(serr)
  689. }
  690. if terr := w.tail().Truncate(off); terr != nil {
  691. t.Fatal(terr)
  692. }
  693. w.Close()
  694. // open, write more
  695. w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
  696. if err != nil {
  697. t.Fatal(err)
  698. }
  699. _, _, ents, rerr := w.ReadAll()
  700. if rerr != nil {
  701. t.Fatal(rerr)
  702. }
  703. if len(ents) != 5 {
  704. t.Fatalf("got entries %+v, expected 5 entries", ents)
  705. }
  706. // write more entries
  707. for i := 6; i <= 10; i++ {
  708. es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
  709. if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
  710. t.Fatal(err)
  711. }
  712. }
  713. w.Close()
  714. // confirm all writes
  715. w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
  716. if err != nil {
  717. t.Fatal(err)
  718. }
  719. _, _, ents, rerr = w.ReadAll()
  720. if rerr != nil {
  721. t.Fatal(rerr)
  722. }
  723. if len(ents) != 10 {
  724. t.Fatalf("got entries %+v, expected 10 entries", ents)
  725. }
  726. w.Close()
  727. }
  728. // TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart
  729. func TestRestartCreateWal(t *testing.T) {
  730. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  731. if err != nil {
  732. t.Fatal(err)
  733. }
  734. defer os.RemoveAll(p)
  735. // make temporary directory so it looks like initialization is interrupted
  736. tmpdir := filepath.Clean(p) + ".tmp"
  737. if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil {
  738. t.Fatal(err)
  739. }
  740. if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
  741. t.Fatal(err)
  742. }
  743. w, werr := Create(zap.NewExample(), p, []byte("abc"))
  744. if werr != nil {
  745. t.Fatal(werr)
  746. }
  747. w.Close()
  748. if Exist(tmpdir) {
  749. t.Fatalf("got %q exists, expected it to not exist", tmpdir)
  750. }
  751. if w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}); err != nil {
  752. t.Fatal(err)
  753. }
  754. defer w.Close()
  755. if meta, _, _, rerr := w.ReadAll(); rerr != nil || string(meta) != "abc" {
  756. t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
  757. }
  758. }
  759. // TestOpenOnTornWrite ensures that entries past the torn write are truncated.
  760. func TestOpenOnTornWrite(t *testing.T) {
  761. maxEntries := 40
  762. clobberIdx := 20
  763. overwriteEntries := 5
  764. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  765. if err != nil {
  766. t.Fatal(err)
  767. }
  768. defer os.RemoveAll(p)
  769. w, err := Create(zap.NewExample(), p, nil)
  770. defer func() {
  771. if err = w.Close(); err != nil && err != os.ErrInvalid {
  772. t.Fatal(err)
  773. }
  774. }()
  775. if err != nil {
  776. t.Fatal(err)
  777. }
  778. // get offset of end of each saved entry
  779. offsets := make([]int64, maxEntries)
  780. for i := range offsets {
  781. es := []raftpb.Entry{{Index: uint64(i)}}
  782. if err = w.Save(raftpb.HardState{}, es); err != nil {
  783. t.Fatal(err)
  784. }
  785. if offsets[i], err = w.tail().Seek(0, io.SeekCurrent); err != nil {
  786. t.Fatal(err)
  787. }
  788. }
  789. fn := filepath.Join(p, filepath.Base(w.tail().Name()))
  790. w.Close()
  791. // clobber some entry with 0's to simulate a torn write
  792. f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode)
  793. if ferr != nil {
  794. t.Fatal(ferr)
  795. }
  796. defer f.Close()
  797. _, err = f.Seek(offsets[clobberIdx], io.SeekStart)
  798. if err != nil {
  799. t.Fatal(err)
  800. }
  801. zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx])
  802. _, err = f.Write(zeros)
  803. if err != nil {
  804. t.Fatal(err)
  805. }
  806. f.Close()
  807. w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
  808. if err != nil {
  809. t.Fatal(err)
  810. }
  811. // seek up to clobbered entry
  812. _, _, _, err = w.ReadAll()
  813. if err != nil {
  814. t.Fatal(err)
  815. }
  816. // write a few entries past the clobbered entry
  817. for i := 0; i < overwriteEntries; i++ {
  818. // Index is different from old, truncated entries
  819. es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}}
  820. if err = w.Save(raftpb.HardState{}, es); err != nil {
  821. t.Fatal(err)
  822. }
  823. }
  824. w.Close()
  825. // read back the entries, confirm number of entries matches expectation
  826. w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{})
  827. if err != nil {
  828. t.Fatal(err)
  829. }
  830. _, _, ents, rerr := w.ReadAll()
  831. if rerr != nil {
  832. // CRC error? the old entries were likely never truncated away
  833. t.Fatal(rerr)
  834. }
  835. wEntries := (clobberIdx - 1) + overwriteEntries
  836. if len(ents) != wEntries {
  837. t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
  838. }
  839. }
  840. func TestRenameFail(t *testing.T) {
  841. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  842. if err != nil {
  843. t.Fatal(err)
  844. }
  845. defer os.RemoveAll(p)
  846. oldSegmentSizeBytes := SegmentSizeBytes
  847. defer func() {
  848. SegmentSizeBytes = oldSegmentSizeBytes
  849. }()
  850. SegmentSizeBytes = math.MaxInt64
  851. tp, terr := ioutil.TempDir(os.TempDir(), "waltest")
  852. if terr != nil {
  853. t.Fatal(terr)
  854. }
  855. os.RemoveAll(tp)
  856. w := &WAL{
  857. lg: zap.NewExample(),
  858. dir: p,
  859. }
  860. w2, werr := w.renameWAL(tp)
  861. if w2 != nil || werr == nil { // os.Rename should fail from 'no such file or directory'
  862. t.Fatalf("expected error, got %v", werr)
  863. }
  864. }