Browse Source

Merge https://github.com/coreos/etcd into fix_expire_notify

Xiang Li 12 years ago
parent
commit
f83e76eb60

+ 5 - 0
.travis.yml

@@ -6,3 +6,8 @@ install:
 
 script:
  - ./test.sh
+
+# temporarily fix Travis
+env:
+  global:
+      - TRAVIS_BUILD_DIR=/home/travis/build/coreos/etcd

+ 1 - 0
Dockerfile

@@ -8,3 +8,4 @@ ADD . /opt/etcd
 RUN cd /opt/etcd && ./build
 EXPOSE 4001 7001
 ENTRYPOINT ["/opt/etcd/etcd"]
+

+ 7 - 10
etcd.go

@@ -19,6 +19,7 @@ package main
 import (
 	"fmt"
 	"os"
+	"time"
 
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/server"
@@ -52,16 +53,6 @@ func main() {
 		profile(config.CPUProfileFile)
 	}
 
-	// Only guess the machine name if there is no data dir specified
-	// because the info file will should have our name
-	if config.Name == "" && config.DataDir == "" {
-		config.NameFromHostname()
-	}
-
-	if config.DataDir == "" && config.Name != "" {
-		config.DataDirFromName()
-	}
-
 	if config.DataDir == "" {
 		log.Fatal("The data dir was not set and could not be guessed from machine name")
 	}
@@ -95,6 +86,12 @@ func main() {
 	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount)
 	ps.MaxClusterSize = config.MaxClusterSize
 	ps.RetryTimes = config.MaxRetryAttempts
+	if config.HeartbeatTimeout > 0 {
+		ps.HeartbeatTimeout = time.Duration(config.HeartbeatTimeout) * time.Millisecond
+	}
+	if config.ElectionTimeout > 0 {
+		ps.ElectionTimeout = time.Duration(config.ElectionTimeout) * time.Millisecond
+	}
 
 	// Create client server.
 	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store)

+ 37 - 13
server/config.go

@@ -67,7 +67,8 @@ type Config struct {
 	ShowVersion      bool
 	Verbose          bool `toml:"verbose" env:"ETCD_VERBOSE"`
 	VeryVerbose      bool `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"`
-
+	HeartbeatTimeout int  `toml:"peer_heartbeat_timeout" env:"ETCD_PEER_HEARTBEAT_TIMEOUT"`
+	ElectionTimeout  int  `toml:"peer_election_timeout" env:"ETCD_PEER_ELECTION_TIMEOUT"`
 	Peer struct {
 		Addr     string `toml:"addr" env:"ETCD_PEER_ADDR"`
 		BindAddr string `toml:"bind_addr" env:"ETCD_PEER_BIND_ADDR"`
@@ -87,6 +88,8 @@ func NewConfig() *Config {
 	c.MaxRetryAttempts = 3
 	c.Peer.Addr = "127.0.0.1:7001"
 	c.SnapshotCount = 10000
+	c.ElectionTimeout = 0
+	c.HeartbeatTimeout = 0
 	return c
 }
 
@@ -131,6 +134,11 @@ func (c *Config) Load(arguments []string) error {
 		return fmt.Errorf("sanitize: %v", err)
 	}
 
+	// Force remove server configuration if specified.
+	if c.Force {
+		c.Reset()
+	}
+
 	return nil
 }
 
@@ -228,6 +236,9 @@ func (c *Config) LoadFlags(arguments []string) error {
 	f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "")
 	f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "")
 	f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "")
+	f.IntVar(&c.HeartbeatTimeout, "peer-heartbeat-timeout", c.HeartbeatTimeout, "")
+	f.IntVar(&c.ElectionTimeout, "peer-election-timeout", c.ElectionTimeout, "")
+
 	f.StringVar(&cors, "cors", "", "")
 
 	f.BoolVar(&c.Snapshot, "snapshot", c.Snapshot, "")
@@ -278,11 +289,6 @@ func (c *Config) LoadFlags(arguments []string) error {
 		c.CorsOrigins = trimsplit(cors, ",")
 	}
 
-	// Force remove server configuration if specified.
-	if c.Force {
-		c.Reset()
-	}
-
 	return nil
 }
 
@@ -404,6 +410,16 @@ func (c *Config) Sanitize() error {
 		return fmt.Errorf("Peer Listen Host: %s", err)
 	}
 
+	// Only guess the machine name if there is no data dir specified
+	// because the info file should have our name
+	if c.Name == "" && c.DataDir == "" {
+		c.NameFromHostname()
+	}
+
+	if c.DataDir == "" && c.Name != "" {
+		c.DataDirFromName()
+	}
+
 	return nil
 }
 
@@ -435,7 +451,7 @@ func (c *Config) PeerTLSConfig() (TLSConfig, error) {
 	return c.PeerTLSInfo().Config()
 }
 
-// sanitizeURL will cleanup a host string in the format hostname:port and
+// sanitizeURL will cleanup a host string in the format hostname[:port] and
 // attach a schema.
 func sanitizeURL(host string, defaultScheme string) (string, error) {
 	// Blank URLs are fine input, just return it
@@ -466,14 +482,22 @@ func sanitizeBindAddr(bindAddr string, addr string) (string, error) {
 		return "", err
 	}
 
-	ahost, aport, err := net.SplitHostPort(aurl.Host)
-	if err != nil {
-		return "", err
+	// If it is a valid host:port simply return with no further checks.
+	bhost, bport, err := net.SplitHostPort(bindAddr)
+	if err == nil && bhost != "" {
+		return bindAddr, nil
+	}
+
+	// SplitHostPort makes the host optional, but we don't want that.
+	if bhost == "" && bport != "" {
+		return "", fmt.Errorf("IP required can't use a port only")
 	}
 
-	// If the listen host isn't set use the advertised host
-	if bindAddr == "" {
-		bindAddr = ahost
+	// bindAddr doesn't have a port if we reach here so take the port from the
+	// advertised URL.
+	_, aport, err := net.SplitHostPort(aurl.Host)
+	if err != nil {
+		return "", err
 	}
 
 	return net.JoinHostPort(bindAddr, aport), nil

+ 41 - 0
server/config_test.go

@@ -223,6 +223,29 @@ func TestConfigBindAddrFlag(t *testing.T) {
 	assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
 }
 
+// Ensures that a the Listen Host port overrides the advertised port
+func TestConfigBindAddrOverride(t *testing.T) {
+	c := NewConfig()
+	assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", "127.0.0.1:4010"}), "")
+	assert.Nil(t, c.Sanitize())
+	assert.Equal(t, c.BindAddr, "127.0.0.1:4010", "")
+}
+
+// Ensures that a the Listen Host inherits its port from the advertised addr
+func TestConfigBindAddrInheritPort(t *testing.T) {
+	c := NewConfig()
+	assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", "127.0.0.1"}), "")
+	assert.Nil(t, c.Sanitize())
+	assert.Equal(t, c.BindAddr, "127.0.0.1:4009", "")
+}
+
+// Ensures that a port only argument errors out
+func TestConfigBindAddrErrorOnNoHost(t *testing.T) {
+	c := NewConfig()
+	assert.Nil(t, c.LoadFlags([]string{"-addr", "127.0.0.1:4009", "-bind-addr", ":4010"}), "")
+	assert.Error(t, c.Sanitize())
+}
+
 // Ensures that the peers can be parsed from the environment.
 func TestConfigPeersEnv(t *testing.T) {
 	withEnv("ETCD_PEERS", "coreos.com:4001,coreos.com:4002", func(c *Config) {
@@ -313,6 +336,24 @@ func TestConfigNameFlag(t *testing.T) {
 	assert.Equal(t, c.Name, "test-name", "")
 }
 
+// Ensures that a Name gets guessed if not specified
+func TestConfigNameGuess(t *testing.T) {
+	c := NewConfig()
+	assert.Nil(t, c.LoadFlags([]string{}), "")
+	assert.Nil(t, c.Sanitize())
+	name, _ := os.Hostname()
+	assert.Equal(t, c.Name, name, "")
+}
+
+// Ensures that a DataDir gets guessed if not specified
+func TestConfigDataDirGuess(t *testing.T) {
+	c := NewConfig()
+	assert.Nil(t, c.LoadFlags([]string{}), "")
+	assert.Nil(t, c.Sanitize())
+	name, _ := os.Hostname()
+	assert.Equal(t, c.DataDir, name+".etcd", "")
+}
+
 // Ensures that Snapshot can be parsed from the environment.
 func TestConfigSnapshotEnv(t *testing.T) {
 	withEnv("ETCD_SNAPSHOT", "1", func(c *Config) {

+ 10 - 4
server/peer_server.go

@@ -20,6 +20,8 @@ import (
 	"github.com/gorilla/mux"
 )
 
+const retryInterval = 10
+
 type PeerServer struct {
 	raftServer     raft.Server
 	server         *Server
@@ -38,6 +40,8 @@ type PeerServer struct {
 	snapConf       *snapshotConf
 	MaxClusterSize int
 	RetryTimes     int
+	HeartbeatTimeout time.Duration
+	ElectionTimeout  time.Duration
 }
 
 // TODO: find a good policy to do snapshot
@@ -76,6 +80,8 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon
 				back: -1,
 			},
 		},
+		HeartbeatTimeout: defaultHeartbeatTimeout,
+		ElectionTimeout: defaultElectionTimeout,
 	}
 
 	// Create transporter for raft
@@ -105,8 +111,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error {
 		}
 	}
 
-	s.raftServer.SetElectionTimeout(ElectionTimeout)
-	s.raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
+	s.raftServer.SetElectionTimeout(s.ElectionTimeout)
+	s.raftServer.SetHeartbeatTimeout(s.HeartbeatTimeout)
 
 	s.raftServer.Start()
 
@@ -228,8 +234,8 @@ func (s *PeerServer) startAsFollower(cluster []string) {
 		if ok {
 			return
 		}
-		log.Warnf("cannot join to cluster via given peers, retry in %d seconds", RetryInterval)
-		time.Sleep(time.Second * RetryInterval)
+		log.Warnf("cannot join to cluster via given peers, retry in %d seconds", retryInterval)
+		time.Sleep(time.Second * retryInterval)
 	}
 
 	log.Fatalf("Cannot join the cluster via given peers after %x retries", s.RetryTimes)

+ 3 - 5
server/timeout.go

@@ -5,11 +5,9 @@ import (
 )
 
 const (
-	// The amount of time to elapse without a heartbeat before becoming a candidate.
-	ElectionTimeout = 200 * time.Millisecond
+	// The amount of time to elapse without a heartbeat before becoming a candidate
+	defaultElectionTimeout = 200 * time.Millisecond
 
 	// The frequency by which heartbeats are sent to followers.
-	HeartbeatTimeout = 50 * time.Millisecond
-
-	RetryInterval = 10
+	defaultHeartbeatTimeout = 50 * time.Millisecond
 )

+ 10 - 21
server/transporter.go

@@ -13,20 +13,6 @@ import (
 	"github.com/coreos/raft"
 )
 
-// Timeout for setup internal raft http connection
-// This should not exceed 3 * RTT
-var dailTimeout = 3 * HeartbeatTimeout
-
-// Timeout for setup internal raft http connection + receive all post body
-// The raft server will not send back response header until it received all the
-// post body.
-// This should not exceed dailTimeout + electionTimeout
-var responseHeaderTimeout = 3*HeartbeatTimeout + ElectionTimeout
-
-// Timeout for receiving the response body from the server
-// This should not exceed heartbeatTimeout
-var tranTimeout = HeartbeatTimeout
-
 // Transporter layer for communication between raft nodes
 type transporter struct {
 	client     *http.Client
@@ -34,14 +20,22 @@ type transporter struct {
 	peerServer *PeerServer
 }
 
+type dialer func(network, addr string) (net.Conn, error)
+
 // Create transporter using by raft server
 // Create http or https transporter based on
 // whether the user give the server cert and key
 func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
+	// names for each type of timeout, for the sake of clarity
+	dialTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout
+	responseHeaderTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout
+
 	t := transporter{}
 
 	tr := &http.Transport{
-		Dial: dialWithTimeout,
+		Dial: func(network, addr string) (net.Conn, error) {
+			return net.DialTimeout(network, addr, dialTimeout)
+		},
 		ResponseHeaderTimeout: responseHeaderTimeout,
 	}
 
@@ -57,11 +51,6 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *
 	return &t
 }
 
-// Dial with timeout
-func dialWithTimeout(network, addr string) (net.Conn, error) {
-	return net.DialTimeout(network, addr, dailTimeout)
-}
-
 // Sends AppendEntries RPCs to a peer when the server is the leader.
 func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
 	var b bytes.Buffer
@@ -238,7 +227,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error)
 // Cancel the on fly HTTP transaction when timeout happens.
 func (t *transporter) CancelWhenTimeout(req *http.Request) {
 	go func() {
-		time.Sleep(tranTimeout)
+		time.Sleep(t.peerServer.HeartbeatTimeout)
 		t.transport.CancelRequest(req)
 	}()
 }

+ 10 - 6
server/usage.go

@@ -31,18 +31,22 @@ Cluster Configuration Options:
                                   should match the peer's '-peer-addr' flag.
 
 Client Communication Options:
-  -addr=<host:port>   The public host:port used for client communication.
-  -bind-addr=<host>   The listening hostname used for client communication.
-  -ca-file=<path>     Path to the client CA file.
-  -cert-file=<path>   Path to the client cert file.
-  -key-file=<path>    Path to the client key file.
+  -addr=<host:port>         The public host:port used for client communication.
+  -bind-addr=<host[:port]>  The listening host:port used for client communication.
+  -ca-file=<path>           Path to the client CA file.
+  -cert-file=<path>         Path to the client cert file.
+  -key-file=<path>          Path to the client key file.
 
 Peer Communication Options:
   -peer-addr=<host:port>  The public host:port used for peer communication.
-  -peer-bind-addr=<host>  The listening hostname used for peer communication.
+  -peer-bind-addr=<host[:port]>  The listening host:port used for peer communication.
   -peer-ca-file=<path>    Path to the peer CA file.
   -peer-cert-file=<path>  Path to the peer cert file.
   -peer-key-file=<path>   Path to the peer key file.
+  -peer-heartbeat-timeout=<time>
+                          Time (in milliseconds) for a heartbeat to timeout.
+  -peer-election-timeout=<time>
+                          Time (in milliseconds) for an election to timeout.
 
 Other Options:
   -max-result-buffer   Max size of the result buffer.

+ 12 - 3
server/v2/get_handler.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"net/http"
+	"net/url"
 	"strconv"
 
 	etcdErr "github.com/coreos/etcd/error"
@@ -24,9 +25,17 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
 		leader := s.Leader()
 		hostname, _ := s.ClientURL(leader)
-		url := hostname + req.URL.Path
-		log.Debugf("Redirect consistent get to %s", url)
-		http.Redirect(w, req, url, http.StatusTemporaryRedirect)
+
+		url, err := url.Parse(hostname)
+		if err != nil {
+			log.Warn("Redirect cannot parse hostName ", hostname)
+			return err
+		}
+		url.RawQuery = req.URL.RawQuery
+		url.Path = req.URL.Path
+
+		log.Debugf("Redirect consistent get to %s", url.String())
+		http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect)
 		return nil
 	}
 

+ 1 - 1
server/v2/tests/delete_handler_test.go

@@ -24,7 +24,7 @@ func TestV2DeleteKey(t *testing.T) {
 		resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
 		body := tests.ReadBody(resp)
 		assert.Nil(t, err, "")
-		assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","prevValue":"XXX","modifiedIndex":3,"createdIndex":2}}`, "")
+		assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2}}`, "")
 	})
 }
 

+ 0 - 5
server/v2/tests/put_handler_test.go

@@ -131,9 +131,6 @@ func TestV2UpdateKeySuccess(t *testing.T) {
 		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "update", "")
-
-		node := body["node"].(map[string]interface{})
-		assert.Equal(t, node["prevValue"], "XXX", "")
 	})
 }
 
@@ -192,7 +189,6 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "compareAndSwap", "")
 		node := body["node"].(map[string]interface{})
-		assert.Equal(t, node["prevValue"], "XXX", "")
 		assert.Equal(t, node["value"], "YYY", "")
 		assert.Equal(t, node["modifiedIndex"], 3, "")
 	})
@@ -254,7 +250,6 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "compareAndSwap", "")
 		node := body["node"].(map[string]interface{})
-		assert.Equal(t, node["prevValue"], "XXX", "")
 		assert.Equal(t, node["value"], "YYY", "")
 		assert.Equal(t, node["modifiedIndex"], 3, "")
 	})

+ 1 - 1
store/node_extern.go

@@ -10,7 +10,7 @@ import (
 // TTL is time to live in second
 type NodeExtern struct {
 	Key           string      `json:"key, omitempty"`
-	PrevValue     string      `json:"prevValue,omitempty"`
+	PrevValue     string      `json:"-"`
 	Value         string      `json:"value,omitempty"`
 	Dir           bool        `json:"dir,omitempty"`
 	Expiration    *time.Time  `json:"expiration,omitempty"`

+ 1 - 1
tests/functional/simple_multi_node_test.go

@@ -54,7 +54,7 @@ func templateTestSimpleMultiNode(t *testing.T, tls bool) {
 	result, err = c.Set("foo", "bar", 100)
 	node = result.Node
 
-	if err != nil || node.Key != "/foo" || node.Value != "bar" || node.PrevValue != "bar" || node.TTL < 95 {
+	if err != nil || node.Key != "/foo" || node.Value != "bar" || node.TTL < 95 {
 		if err != nil {
 			t.Fatal(err)
 		}

+ 2 - 2
tests/functional/single_node_test.go

@@ -43,7 +43,7 @@ func TestSingleNode(t *testing.T) {
 	result, err = c.Set("foo", "bar", 100)
 	node = result.Node
 
-	if err != nil || node.Key != "/foo" || node.Value != "bar" || node.PrevValue != "bar" || node.TTL != 100 {
+	if err != nil || node.Key != "/foo" || node.Value != "bar" || node.TTL != 100 {
 		if err != nil {
 			t.Fatal("Set 2: ", err)
 		}
@@ -56,7 +56,7 @@ func TestSingleNode(t *testing.T) {
 	result, err = c.CompareAndSwap("foo", "foobar", 100, "bar", 0)
 	node = result.Node
 
-	if err != nil || node.Key != "/foo" || node.Value != "foobar" || node.PrevValue != "bar" || node.TTL != 100 {
+	if err != nil || node.Key != "/foo" || node.Value != "foobar" || node.TTL != 100 {
 		if err != nil {
 			t.Fatal(err)
 		}

+ 9 - 4
tests/server_utils.go

@@ -10,10 +10,12 @@ import (
 )
 
 const (
-	testName          = "ETCDTEST"
-	testClientURL     = "localhost:4401"
-	testRaftURL       = "localhost:7701"
-	testSnapshotCount = 10000
+	testName             = "ETCDTEST"
+	testClientURL        = "localhost:4401"
+	testRaftURL          = "localhost:7701"
+	testSnapshotCount    = 10000
+	testHeartbeatTimeout = 50
+	testElectionTimeout  = 200
 )
 
 // Starts a server in a temporary directory.
@@ -23,8 +25,11 @@ func RunServer(f func(*server.Server)) {
 
 	store := store.New()
 	registry := server.NewRegistry(store)
+
 	ps := server.NewPeerServer(testName, path, "http://" + testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount)
 	ps.MaxClusterSize = 9
+	ps.ElectionTimeout = testElectionTimeout
+	ps.HeartbeatTimeout = testHeartbeatTimeout
 	s := server.New(testName, "http://" + testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
 	ps.SetServer(s)