Browse Source

Merge branch 'master' of github.com:coreos/etcd

Conflicts:
	server/peer_server_handlers.go
Yicheng Qin 11 years ago
parent
commit
c92231c91a

+ 0 - 10
config/config.go

@@ -142,16 +142,6 @@ func (c *Config) Load(arguments []string) error {
 		return err
 	}
 
-	// Sanitize all the input fields.
-	if err := c.Sanitize(); err != nil {
-		return fmt.Errorf("sanitize: %v", err)
-	}
-
-	// Force remove server configuration if specified.
-	if c.Force {
-		c.Reset()
-	}
-
 	return nil
 }
 

+ 3 - 3
config/config_test.go

@@ -479,7 +479,7 @@ func TestConfigCustomConfigOverrideSystemConfig(t *testing.T) {
 			c := New()
 			c.SystemPath = p1
 			assert.Nil(t, c.Load([]string{"-config", p2}), "")
-			assert.Equal(t, c.Addr, "http://127.0.0.1:6000", "")
+			assert.Equal(t, c.Addr, "127.0.0.1:6000", "")
 		})
 	})
 }
@@ -494,7 +494,7 @@ func TestConfigEnvVarOverrideCustomConfig(t *testing.T) {
 		c := New()
 		c.SystemPath = ""
 		assert.Nil(t, c.Load([]string{"-config", path}), "")
-		assert.Equal(t, c.Peer.Addr, "http://127.0.0.1:8000", "")
+		assert.Equal(t, c.Peer.Addr, "127.0.0.1:8000", "")
 	})
 }
 
@@ -506,7 +506,7 @@ func TestConfigCLIArgsOverrideEnvVar(t *testing.T) {
 	c := New()
 	c.SystemPath = ""
 	assert.Nil(t, c.Load([]string{"-addr", "127.0.0.1:2000"}), "")
-	assert.Equal(t, c.Addr, "http://127.0.0.1:2000", "")
+	assert.Equal(t, c.Addr, "127.0.0.1:2000", "")
 }
 
 //--------------------------------------

+ 10 - 0
etcd/etcd.go

@@ -70,6 +70,16 @@ func New(c *config.Config) *Etcd {
 
 // Run the etcd instance.
 func (e *Etcd) Run() {
+	// Sanitize all the input fields.
+	if err := e.Config.Sanitize(); err != nil {
+		log.Fatalf("failed sanitizing configuration: %v", err)
+	}
+
+	// Force remove server configuration if specified.
+	if e.Config.Force {
+		e.Config.Reset()
+	}
+
 	// Enable options.
 	if e.Config.VeryVeryVerbose {
 		log.Verbose = true

+ 0 - 4
etcd/etcd_test.go

@@ -34,10 +34,6 @@ func TestRunStop(t *testing.T) {
 	config.Addr = "localhost:0"
 	config.Peer.Addr = "localhost:0"
 
-	if err := config.Sanitize(); err != nil {
-		t.Fatal(err)
-	}
-
 	etcd := New(config)
 	go etcd.Run()
 	<-etcd.ReadyNotify()

+ 4 - 0
http/cors.go

@@ -59,6 +59,10 @@ func (h *CORSHandler) addHeader(w http.ResponseWriter, origin string) {
 // ServeHTTP adds the correct CORS headers based on the origin and returns immediately
 // with a 200 OK if the method is OPTIONS.
 func (h *CORSHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+	// It is important to flush before leaving the goroutine.
+	// Or it may miss the latest info written.
+	defer w.(http.Flusher).Flush()
+
 	// Write CORS header.
 	if h.Info.OriginAllowed("*") {
 		h.addHeader(w, "*")

+ 1 - 1
server/peer_server.go

@@ -696,8 +696,8 @@ func (s *PeerServer) monitorActiveSize() {
 
 		// Retrieve target active size and actual active size.
 		activeSize := s.ClusterConfig().ActiveSize
-		peerCount := s.registry.Count()
 		peers := s.registry.Names()
+		peerCount := s.registry.Count()
 		if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
 			peers = append(peers[:index], peers[index+1:]...)
 		}

+ 12 - 4
server/peer_server_handlers.go

@@ -223,8 +223,9 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht
 // Retrieves a list of peers and standbys.
 func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	machines := make([]*machineMessage, 0)
+	leader := ps.raftServer.Leader()
 	for _, name := range ps.registry.Names() {
-		if msg := ps.getMachineMessage(name); msg != nil {
+		if msg := ps.getMachineMessage(name, leader); msg != nil {
 			machines = append(machines, msg)
 		}
 	}
@@ -234,21 +235,27 @@ func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Re
 // Retrieve single peer or standby.
 func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
 	vars := mux.Vars(req)
-	json.NewEncoder(w).Encode(ps.getMachineMessage(vars["name"]))
+	m := ps.getMachineMessage(vars["name"], ps.raftServer.Leader())
+	json.NewEncoder(w).Encode(m)
 }
 
-func (ps *PeerServer) getMachineMessage(name string) *machineMessage {
+func (ps *PeerServer) getMachineMessage(name string, leader string) *machineMessage {
 	if !ps.registry.Exists(name) {
 		return nil
 	}
 
 	clientURL, _ := ps.registry.ClientURL(name)
 	peerURL, _ := ps.registry.PeerURL(name)
-	return &machineMessage{
+	msg := &machineMessage{
 		Name:      name,
+		State:     raft.Follower,
 		ClientURL: clientURL,
 		PeerURL:   peerURL,
 	}
+	if name == leader {
+		msg.State = raft.Leader
+	}
+	return msg
 }
 
 // Response to the name request
@@ -300,6 +307,7 @@ func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Reques
 // machineMessage represents information about a peer or standby in the registry.
 type machineMessage struct {
 	Name      string `json:"name"`
+	State     string `json:"state"`
 	ClientURL string `json:"clientURL"`
 	PeerURL   string `json:"peerURL"`
 }

+ 3 - 1
server/raft_server_stats.go

@@ -33,7 +33,7 @@ type raftServerStats struct {
 }
 
 func NewRaftServerStats(name string) *raftServerStats {
-	return &raftServerStats{
+	stats := &raftServerStats{
 		Name:      name,
 		StartTime: time.Now(),
 		sendRateQueue: &statsQueue{
@@ -43,6 +43,8 @@ func NewRaftServerStats(name string) *raftServerStats {
 			back: -1,
 		},
 	}
+	stats.LeaderInfo.startTime = time.Now()
+	return stats
 }
 
 func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {

+ 11 - 13
server/registry.go

@@ -38,19 +38,6 @@ func NewRegistry(s store.Store) *Registry {
 	}
 }
 
-// Names returns a list of cached peer names.
-func (r *Registry) Names() []string {
-	r.Lock()
-	defer r.Unlock()
-
-	names := make([]string, 0, len(r.peers))
-	for name := range r.peers {
-		names = append(names, name)
-	}
-	sort.Sort(sort.StringSlice(names))
-	return names
-}
-
 // Register adds a peer to the registry.
 func (r *Registry) Register(name string, peerURL string, machURL string) error {
 	// Write data to store.
@@ -167,6 +154,17 @@ func (r *Registry) UpdatePeerURL(name string, peerURL string) error {
 	return nil
 }
 
+func (r *Registry) name(key, name string) (string, bool) {
+	return name, true
+}
+
+// Names returns a list of cached peer names.
+func (r *Registry) Names() []string {
+	names := r.urls(RegistryKey, "", "", r.name)
+	sort.Sort(sort.StringSlice(names))
+	return names
+}
+
 // Retrieves the Client URLs for all nodes.
 func (r *Registry) ClientURLs(leaderName, selfName string) []string {
 	return r.urls(RegistryKey, leaderName, selfName, r.clientURL)

+ 7 - 0
server/v2/tests/get_handler_test.go

@@ -90,6 +90,13 @@ func TestV2GetKeyRecursively(t *testing.T) {
 //
 func TestV2WatchKey(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
+		// There exists a little gap between etcd ready to serve and
+		// it actually serves the first request, which means the response
+		// delay could be a little bigger.
+		// This test is time sensitive, so it does one request to ensure
+		// that the server is working.
+		tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"))
+
 		var watchResp *http.Response
 		c := make(chan bool)
 		go func() {

+ 23 - 0
tests/functional/cluster_config_test.go

@@ -2,6 +2,7 @@ package test
 
 import (
 	"bytes"
+	"encoding/json"
 	"os"
 	"testing"
 	"time"
@@ -27,3 +28,25 @@ func TestClusterConfig(t *testing.T) {
 	assert.Equal(t, body["activeSize"], 3)
 	assert.Equal(t, body["promoteDelay"], 60)
 }
+
+// TestGetMachines tests '/v2/admin/machines' sends back messages of all machines.
+func TestGetMachines(t *testing.T) {
+	_, etcds, err := CreateCluster(3, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
+	assert.NoError(t, err)
+	defer DestroyCluster(etcds)
+
+	time.Sleep(1 * time.Second)
+
+	resp, err := tests.Get("http://localhost:7001/v2/admin/machines")
+	if !assert.Equal(t, err, nil) {
+		t.FailNow()
+	}
+	assert.Equal(t, resp.StatusCode, 200)
+	machines := make([]map[string]interface{}, 0)
+	b := tests.ReadBody(resp)
+	json.Unmarshal(b, &machines)
+	assert.Equal(t, len(machines), 3)
+	if machines[0]["state"] != "leader" && machines[1]["state"] != "leader" && machines[2]["state"] != "leader" {
+		t.Errorf("no leader in the cluster")
+	}
+}

+ 2 - 2
tests/functional/simple_snapshot_test.go

@@ -56,7 +56,7 @@ func TestSnapshot(t *testing.T) {
 
 	index, _ := strconv.Atoi(snapshots[0].Name()[2:5])
 
-	if index < 507 || index > 515 {
+	if index < 503 || index > 515 {
 		t.Fatal("wrong name of snapshot :", snapshots[0].Name())
 	}
 
@@ -89,7 +89,7 @@ func TestSnapshot(t *testing.T) {
 
 	index, _ = strconv.Atoi(snapshots[0].Name()[2:6])
 
-	if index < 1014 || index > 1025 {
+	if index < 1010 || index > 1025 {
 		t.Fatal("wrong name of snapshot :", snapshots[0].Name())
 	}
 }

+ 20 - 92
tests/server_utils.go

@@ -1,17 +1,9 @@
 package tests
 
 import (
-	"io/ioutil"
-	"net/http"
-	"os"
-	"sync"
-	"time"
-
-	"github.com/coreos/etcd/third_party/github.com/goraft/raft"
-
-	"github.com/coreos/etcd/metrics"
+	"github.com/coreos/etcd/config"
+	"github.com/coreos/etcd/etcd"
 	"github.com/coreos/etcd/server"
-	"github.com/coreos/etcd/store"
 )
 
 const (
@@ -19,94 +11,30 @@ const (
 	testClientURL         = "localhost:4401"
 	testRaftURL           = "localhost:7701"
 	testSnapshotCount     = 10000
-	testHeartbeatInterval = time.Duration(50) * time.Millisecond
-	testElectionTimeout   = time.Duration(200) * time.Millisecond
+	testHeartbeatInterval = 50
+	testElectionTimeout   = 200
+	testDataDir           = "/tmp/ETCDTEST"
 )
 
-// Starts a server in a temporary directory.
+// Starts a new server.
 func RunServer(f func(*server.Server)) {
-	path, _ := ioutil.TempDir("", "etcd-")
-	defer os.RemoveAll(path)
-
-	store := store.New()
-	registry := server.NewRegistry(store)
-
-	serverStats := server.NewRaftServerStats(testName)
-	followersStats := server.NewRaftFollowersStats(testName)
-
-	psConfig := server.PeerServerConfig{
-		Name:          testName,
-		URL:           "http://" + testRaftURL,
-		Scheme:        "http",
-		SnapshotCount: testSnapshotCount,
-	}
-
-	mb := metrics.NewBucket("")
-
-	ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats)
-	psListener := server.NewListener("http", testRaftURL, nil)
-
-	// Create Raft transporter and server
-	dialTimeout := (3 * testHeartbeatInterval) + testElectionTimeout
-	responseHeaderTimeout := (3 * testHeartbeatInterval) + testElectionTimeout
-	raftTransporter := server.NewTransporter(followersStats, serverStats, registry, testHeartbeatInterval, dialTimeout, responseHeaderTimeout)
-	raftServer, err := raft.NewServer(testName, path, raftTransporter, store, ps, "")
-	if err != nil {
-		panic(err)
-	}
-	raftServer.SetElectionTimeout(testElectionTimeout)
-	raftServer.SetHeartbeatInterval(testHeartbeatInterval)
-	ps.SetRaftServer(raftServer)
-
-	s := server.New(testName, "http://"+testClientURL, ps, registry, store, nil)
-	sListener := server.NewListener("http", testClientURL, nil)
+	c := config.New()
 
-	ps.SetServer(s)
+	c.Name = testName
+	c.Addr = testClientURL
+	c.Peer.Addr = testRaftURL
 
-	w := &sync.WaitGroup{}
+	c.DataDir = testDataDir
+	c.Force = true
 
-	// Start up peer server.
-	c := make(chan bool)
-	go func() {
-		c <- true
-		ps.Start(false, "", []string{})
-		h := waitHandler{w, ps.HTTPHandler()}
-		http.Serve(psListener, &h)
-	}()
-	<-c
-
-	// Start up etcd server.
-	go func() {
-		c <- true
-		h := waitHandler{w, s.HTTPHandler()}
-		http.Serve(sListener, &h)
-	}()
-	<-c
-
-	// Wait to make sure servers have started.
-	time.Sleep(50 * time.Millisecond)
+	c.Peer.HeartbeatInterval = testHeartbeatInterval
+	c.Peer.ElectionTimeout = testElectionTimeout
+	c.SnapshotCount = testSnapshotCount
 
+	i := etcd.New(c)
+	go i.Run()
+	<-i.ReadyNotify()
 	// Execute the function passed in.
-	f(s)
-
-	// Clean up servers.
-	ps.Stop()
-	psListener.Close()
-	sListener.Close()
-	w.Wait()
-}
-
-type waitHandler struct {
-	wg      *sync.WaitGroup
-	handler http.Handler
-}
-
-func (h *waitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	h.wg.Add(1)
-	defer h.wg.Done()
-	h.handler.ServeHTTP(w, r)
-
-	//important to flush before decrementing the wait group.
-	//we won't get a chance to once main() ends.
-	w.(http.Flusher).Flush()
+	f(i.Server)
+	i.Stop()
 }