Преглед на файлове

Merge pull request #1587 from xiangli-cmu/fix_wal

wal: sync before returning from create
Xiang Li преди 11 години
родител
ревизия
075ab6415f
променени са 4 файла, в които са добавени 66 реда и са изтрити 31 реда
  1. 37 27
      etcdmain/etcd.go
  2. 6 2
      raft/node.go
  3. 1 0
      wal/wal.go
  4. 22 2
      wal/wal_test.go

+ 37 - 27
etcdmain/etcd.go

@@ -20,6 +20,7 @@ import (
 	"flag"
 	"fmt"
 	"log"
+	"net"
 	"net/http"
 	"os"
 	"strings"
@@ -173,42 +174,22 @@ func startEtcd() {
 	if err != nil {
 		log.Fatal(err.Error())
 	}
-	cfg := &etcdserver.ServerConfig{
-		Name:         *name,
-		ClientURLs:   acurls,
-		DataDir:      *dir,
-		SnapCount:    *snapCount,
-		Cluster:      cls,
-		DiscoveryURL: *durl,
-		ClusterState: *clusterState,
-		Transport:    pt,
-	}
-	s := etcdserver.NewServer(cfg)
-	s.Start()
-
-	ch := &cors.CORSHandler{
-		Handler: etcdhttp.NewClientHandler(s),
-		Info:    corsInfo,
-	}
-	ph := etcdhttp.NewPeerHandler(s)
 
 	lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo)
 	if err != nil {
 		log.Fatal(err.Error())
 	}
 
+	plns := make([]net.Listener, 0)
 	for _, u := range lpurls {
 		l, err := transport.NewListener(u.Host, peerTLSInfo)
 		if err != nil {
 			log.Fatal(err)
 		}
 
-		// Start the peer server in a goroutine
 		urlStr := u.String()
-		go func() {
-			log.Print("etcd: listening for peers on ", urlStr)
-			log.Fatal(http.Serve(l, ph))
-		}()
+		log.Print("etcd: listening for peers on ", urlStr)
+		plns = append(plns, l)
 	}
 
 	lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
@@ -216,7 +197,7 @@ func startEtcd() {
 		log.Fatal(err.Error())
 	}
 
-	// Start a client server goroutine for each listen address
+	clns := make([]net.Listener, 0)
 	for _, u := range lcurls {
 		l, err := transport.NewListener(u.Host, clientTLSInfo)
 		if err != nil {
@@ -224,10 +205,39 @@ func startEtcd() {
 		}
 
 		urlStr := u.String()
-		go func() {
-			log.Print("etcd: listening for client requests on ", urlStr)
+		log.Print("etcd: listening for client requests on ", urlStr)
+		clns = append(clns, l)
+	}
+
+	cfg := &etcdserver.ServerConfig{
+		Name:         *name,
+		ClientURLs:   acurls,
+		DataDir:      *dir,
+		SnapCount:    *snapCount,
+		Cluster:      cls,
+		DiscoveryURL: *durl,
+		ClusterState: *clusterState,
+		Transport:    pt,
+	}
+	s := etcdserver.NewServer(cfg)
+	s.Start()
+
+	ch := &cors.CORSHandler{
+		Handler: etcdhttp.NewClientHandler(s),
+		Info:    corsInfo,
+	}
+	ph := etcdhttp.NewPeerHandler(s)
+	// Start the peer server in a goroutine
+	for _, l := range plns {
+		go func(l net.Listener) {
+			log.Fatal(http.Serve(l, ph))
+		}(l)
+	}
+	// Start a client server goroutine for each listen address
+	for _, l := range clns {
+		go func(l net.Listener) {
 			log.Fatal(http.Serve(l, ch))
-		}()
+		}(l)
 	}
 }
 

+ 6 - 2
raft/node.go

@@ -168,8 +168,12 @@ func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st p
 	if snapshot != nil {
 		r.restore(*snapshot)
 	}
-	r.loadState(st)
-	r.loadEnts(ents)
+	if !isHardStateEqual(st, emptyState) {
+		r.loadState(st)
+	}
+	if len(ents) != 0 {
+		r.loadEnts(ents)
+	}
 	go n.run(r)
 	return &n
 }

+ 1 - 0
wal/wal.go

@@ -97,6 +97,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
 	if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
 		return nil, err
 	}
+	w.Sync()
 	return w, nil
 }
 

+ 22 - 2
wal/wal_test.go

@@ -25,6 +25,7 @@ import (
 	"testing"
 
 	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/wal/walpb"
 )
 
 func TestNew(t *testing.T) {
@@ -34,14 +35,33 @@ func TestNew(t *testing.T) {
 	}
 	defer os.RemoveAll(p)
 
-	w, err := Create(p, nil)
+	w, err := Create(p, []byte("somedata"))
 	if err != nil {
 		t.Fatalf("err = %v, want nil", err)
 	}
 	if g := path.Base(w.f.Name()); g != walName(0, 0) {
 		t.Errorf("name = %+v, want %+v", g, walName(0, 0))
 	}
-	w.Close()
+	defer w.Close()
+	gd, err := ioutil.ReadFile(w.f.Name())
+	if err != nil {
+		t.Fatalf("err = %v, want nil", err)
+	}
+
+	var wb bytes.Buffer
+	e := newEncoder(&wb, 0)
+	err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
+	if err != nil {
+		t.Fatalf("err = %v, want nil", err)
+	}
+	err = e.encode(&walpb.Record{Type: metadataType, Data: []byte("somedata")})
+	if err != nil {
+		t.Fatalf("err = %v, want nil", err)
+	}
+	e.flush()
+	if !reflect.DeepEqual(gd, wb.Bytes()) {
+		t.Errorf("data = %v, want %v", gd, wb.Bytes())
+	}
 }
 
 func TestNewForInitedDir(t *testing.T) {