Преглед изворни кода

wal: support auto-cut in wal

WAL should control the cut logic itself. We want to do falloc to
per allocate the space for a segmented wal file at the beginning
and cut it when it size reaches the limit.
Xiang Li пре 10 година
родитељ
комит
86429264fb
6 измењених фајлова са 32 додато и 35 уклоњено
  1. 0 4
      etcdserver/server.go
  2. 5 12
      etcdserver/server_test.go
  3. 0 5
      etcdserver/storage.go
  4. 6 7
      wal/doc.go
  5. 17 3
      wal/wal.go
  6. 4 4
      wal/wal_test.go

+ 0 - 4
etcdserver/server.go

@@ -843,10 +843,6 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
 		}
 		log.Panicf("etcdserver: unexpected compaction error %v", err)
 	}
-	log.Printf("etcdserver: compacted log at index %d", snapi)
-	if err := s.r.storage.Cut(); err != nil {
-		log.Panicf("etcdserver: rotate wal file should never fail: %v", err)
-	}
 	log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
 }
 

+ 5 - 12
etcdserver/server_test.go

@@ -720,15 +720,12 @@ func TestSnapshot(t *testing.T) {
 		t.Errorf("action = %s, want Save", gaction[0])
 	}
 	gaction = p.Action()
-	if len(gaction) != 2 {
-		t.Fatalf("len(action) = %d, want 2", len(gaction))
+	if len(gaction) != 1 {
+		t.Fatalf("len(action) = %d, want 1", len(gaction))
 	}
 	if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
 		t.Errorf("action = %s, want SaveSnap", gaction[0])
 	}
-	if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Cut"}) {
-		t.Errorf("action = %s, want Cut", gaction[1])
-	}
 }
 
 // Applied > SnapCount should trigger a SaveSnap event
@@ -755,12 +752,12 @@ func TestTriggerSnap(t *testing.T) {
 
 	gaction := p.Action()
 	// each operation is recorded as a Save
-	// (SnapCount+1) * Puts + Cut + SaveSnap = (SnapCount+1) * Save + SaveSnap + CUT
-	wcnt := 3 + snapc
+	// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
+	wcnt := 2 + snapc
 	if len(gaction) != wcnt {
 		t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
 	}
-	if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) {
+	if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
 		t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
 	}
 }
@@ -1267,10 +1264,6 @@ func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 	p.Record(testutil.Action{Name: "Save"})
 	return nil
 }
-func (p *storageRecorder) Cut() error {
-	p.Record(testutil.Action{Name: "Cut"})
-	return nil
-}
 func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
 	if !raft.IsEmptySnap(st) {
 		p.Record(testutil.Action{Name: "SaveSnap"})

+ 0 - 5
etcdserver/storage.go

@@ -35,11 +35,6 @@ type Storage interface {
 	Save(st raftpb.HardState, ents []raftpb.Entry) error
 	// SaveSnap function saves snapshot to the underlying stable storage.
 	SaveSnap(snap raftpb.Snapshot) error
-
-	// TODO: WAL should be able to control cut itself. After implement self-controlled cut,
-	// remove it in this interface.
-	// Cut cuts out a new wal file for saving new state and entries.
-	Cut() error
 	// Close closes the Storage and performs finalization.
 	Close() error
 }

+ 6 - 7
wal/doc.go

@@ -17,7 +17,7 @@ Package wal provides an implementation of a write ahead log that is used by
 etcd.
 
 A WAL is created at a particular directory and is made up of a number of
-discrete WAL files. Inside of each file the raft state and entries are appended
+segmented WAL files. Inside of each file the raft state and entries are appended
 to it with the Save method:
 
 	metadata := []byte{}
@@ -41,12 +41,11 @@ The first WAL file to be created will be 0000000000000000-0000000000000000.wal
 indicating an initial sequence of 0 and an initial raft index of 0. The first
 entry written to WAL MUST have raft index 0.
 
-Periodically a user will want to "cut" the WAL and place new entries into a new
-file. This will increment an internal sequence number and cause a new file to
-be created. If the last raft index saved was 0x20 and this is the first time
-Cut has been called on this WAL then the sequence will increment from 0x0 to
-0x1. The new file will be: 0000000000000001-0000000000000021.wal. If a second
-Cut issues 0x10 entries with incremental index later then the file will be called:
+WAL will cuts its current wal files if its size exceeds 8MB. This will increment an internal
+sequence number and cause a new file to be created. If the last raft index saved
+was 0x20 and this is the first time cut has been called on this WAL then the sequence will
+increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal.
+If a second cut issues 0x10 entries with incremental index later then the file will be called:
 0000000000000002-0000000000000031.wal.
 
 At a later time a WAL can be opened at a particular snapshot. If there is no

+ 17 - 3
wal/wal.go

@@ -41,6 +41,10 @@ const (
 
 	// the owner can make/remove files inside the directory
 	privateDirMode = 0700
+
+	// the expected size of each wal segment file.
+	// the actual size might be bigger than it.
+	segmentSizeBytes = 64 * 1000 * 1000 // 64MB
 )
 
 var (
@@ -274,14 +278,15 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 	return metadata, state, ents, err
 }
 
-// Cut closes current file written and creates a new one ready to append.
-func (w *WAL) Cut() error {
+// cut closes current file written and creates a new one ready to append.
+func (w *WAL) cut() error {
 	// create a new wal file with name sequence + 1
 	fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
 	f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	if err != nil {
 		return err
 	}
+	log.Printf("wal: segmented wal file %v is created", fpath)
 	l, err := fileutil.NewLock(f.Name())
 	if err != nil {
 		return err
@@ -402,7 +407,16 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 			return err
 		}
 	}
-	return w.sync()
+
+	fstat, err := w.f.Stat()
+	if err != nil {
+		return err
+	}
+	if fstat.Size() < segmentSizeBytes {
+		return w.sync()
+	}
+	// TODO: add a test for this code path when refactoring the tests
+	return w.cut()
 }
 
 func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {

+ 4 - 4
wal/wal_test.go

@@ -156,7 +156,7 @@ func TestCut(t *testing.T) {
 	if err := w.Save(state, []raftpb.Entry{{}}); err != nil {
 		t.Fatal(err)
 	}
-	if err := w.Cut(); err != nil {
+	if err := w.cut(); err != nil {
 		t.Fatal(err)
 	}
 	wname := walName(1, 1)
@@ -168,7 +168,7 @@ func TestCut(t *testing.T) {
 	if err := w.Save(raftpb.HardState{}, es); err != nil {
 		t.Fatal(err)
 	}
-	if err := w.Cut(); err != nil {
+	if err := w.cut(); err != nil {
 		t.Fatal(err)
 	}
 	snap := walpb.Snapshot{Index: 2, Term: 1}
@@ -335,7 +335,7 @@ func TestRecoverAfterCut(t *testing.T) {
 		if err = md.Save(raftpb.HardState{}, es); err != nil {
 			t.Fatal(err)
 		}
-		if err = md.Cut(); err != nil {
+		if err = md.cut(); err != nil {
 			t.Fatal(err)
 		}
 	}
@@ -427,7 +427,7 @@ func TestOpenNotInUse(t *testing.T) {
 		if err = w.Save(raftpb.HardState{}, es); err != nil {
 			t.Fatal(err)
 		}
-		if err = w.Cut(); err != nil {
+		if err = w.cut(); err != nil {
 			t.Fatal(err)
 		}
 	}