Browse Source

Merge pull request #1213 from philips/improve-cluster

etcdserver: stop using addScheme
Brandon Philips 11 years ago
parent
commit
7639752c82

+ 4 - 4
etcdserver/cluster.go

@@ -59,7 +59,7 @@ func (c Cluster) Pick(id int64) string {
 }
 }
 
 
 // Set parses command line sets of names to IPs formatted like:
 // Set parses command line sets of names to IPs formatted like:
-// mach0=1.1.1.1,mach0=2.2.2.2,mach0=1.1.1.1,mach1=2.2.2.2,mach1=3.3.3.3
+// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3
 func (c *Cluster) Set(s string) error {
 func (c *Cluster) Set(s string) error {
 	*c = Cluster{}
 	*c = Cluster{}
 	v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
 	v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
@@ -71,7 +71,7 @@ func (c *Cluster) Set(s string) error {
 		if len(urls) == 0 || urls[0] == "" {
 		if len(urls) == 0 || urls[0] == "" {
 			return fmt.Errorf("Empty URL given for %q", name)
 			return fmt.Errorf("Empty URL given for %q", name)
 		}
 		}
-		m := newMember(name, urls)
+		m := newMember(name, urls, nil)
 		err := c.Add(*m)
 		err := c.Add(*m)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
@@ -106,7 +106,7 @@ func (c Cluster) PeerURLs() []string {
 	endpoints := make([]string, 0)
 	endpoints := make([]string, 0)
 	for _, p := range c {
 	for _, p := range c {
 		for _, addr := range p.PeerURLs {
 		for _, addr := range p.PeerURLs {
-			endpoints = append(endpoints, addScheme(addr))
+			endpoints = append(endpoints, addr)
 		}
 		}
 	}
 	}
 	sort.Strings(endpoints)
 	sort.Strings(endpoints)
@@ -120,7 +120,7 @@ func (c Cluster) ClientURLs() []string {
 	urls := make([]string, 0)
 	urls := make([]string, 0)
 	for _, p := range c {
 	for _, p := range c {
 		for _, url := range p.ClientURLs {
 		for _, url := range p.ClientURLs {
-			urls = append(urls, addScheme(url))
+			urls = append(urls, url)
 		}
 		}
 	}
 	}
 	sort.Strings(urls)
 	sort.Strings(urls)

+ 0 - 6
etcdserver/cluster_store.go

@@ -75,12 +75,6 @@ func (s *clusterStore) Delete(id int64) {
 	}
 	}
 }
 }
 
 
-// addScheme adds the protocol prefix to a string; currently only HTTP
-// TODO: improve this when implementing TLS
-func addScheme(addr string) string {
-	return fmt.Sprintf("http://%s", addr)
-}
-
 func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
 func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
 	c := &http.Client{Transport: t}
 	c := &http.Client{Transport: t}
 
 

+ 17 - 17
etcdserver/cluster_test.go

@@ -77,11 +77,11 @@ func TestClusterSet(t *testing.T) {
 		parse bool
 		parse bool
 	}{
 	}{
 		{
 		{
-			"mem1=10.0.0.1:2379,mem1=128.193.4.20:2379,mem2=10.0.0.2:2379,default=127.0.0.1:2379",
+			"mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379",
 			[]Member{
 			[]Member{
-				{ID: 3736794188555456841, Name: "mem1", PeerURLs: []string{"10.0.0.1:2379", "128.193.4.20:2379"}},
-				{ID: 5674507346857578431, Name: "mem2", PeerURLs: []string{"10.0.0.2:2379"}},
-				{ID: 2676999861503984872, Name: "default", PeerURLs: []string{"127.0.0.1:2379"}},
+				{ID: 3736794188555456841, Name: "mem1", PeerURLs: []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}},
+				{ID: 5674507346857578431, Name: "mem2", PeerURLs: []string{"http://10.0.0.2:2379"}},
+				{ID: 2676999861503984872, Name: "default", PeerURLs: []string{"http://127.0.0.1:2379"}},
 			},
 			},
 			true,
 			true,
 		},
 		},
@@ -104,10 +104,10 @@ func TestClusterSet(t *testing.T) {
 
 
 func TestClusterSetBad(t *testing.T) {
 func TestClusterSetBad(t *testing.T) {
 	tests := []string{
 	tests := []string{
-		"mem1=,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379",
-		"mem1,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379",
+		"mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
+		"mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
 		// TODO(philips): anyone know of a 64 bit sha1 hash collision
 		// TODO(philips): anyone know of a 64 bit sha1 hash collision
-		// "06b2f82fd81b2c20=128.193.4.20:2379,02c60cb75083ceef=128.193.4.20:2379",
+		// "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379",
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		g := Cluster{}
 		g := Cluster{}
@@ -151,7 +151,7 @@ func TestClusterPeerURLs(t *testing.T) {
 		// single peer with a single address
 		// single peer with a single address
 		{
 		{
 			mems: []Member{
 			mems: []Member{
-				{ID: 1, PeerURLs: []string{"192.0.2.1"}},
+				{ID: 1, PeerURLs: []string{"http://192.0.2.1"}},
 			},
 			},
 			wurls: []string{"http://192.0.2.1"},
 			wurls: []string{"http://192.0.2.1"},
 		},
 		},
@@ -159,7 +159,7 @@ func TestClusterPeerURLs(t *testing.T) {
 		// single peer with a single address with a port
 		// single peer with a single address with a port
 		{
 		{
 			mems: []Member{
 			mems: []Member{
-				{ID: 1, PeerURLs: []string{"192.0.2.1:8001"}},
+				{ID: 1, PeerURLs: []string{"http://192.0.2.1:8001"}},
 			},
 			},
 			wurls: []string{"http://192.0.2.1:8001"},
 			wurls: []string{"http://192.0.2.1:8001"},
 		},
 		},
@@ -167,9 +167,9 @@ func TestClusterPeerURLs(t *testing.T) {
 		// several members explicitly unsorted
 		// several members explicitly unsorted
 		{
 		{
 			mems: []Member{
 			mems: []Member{
-				{ID: 2, PeerURLs: []string{"192.0.2.3", "192.0.2.4"}},
-				{ID: 3, PeerURLs: []string{"192.0.2.5", "192.0.2.6"}},
-				{ID: 1, PeerURLs: []string{"192.0.2.1", "192.0.2.2"}},
+				{ID: 2, PeerURLs: []string{"http://192.0.2.3", "http://192.0.2.4"}},
+				{ID: 3, PeerURLs: []string{"http://192.0.2.5", "http://192.0.2.6"}},
+				{ID: 1, PeerURLs: []string{"http://192.0.2.1", "http://192.0.2.2"}},
 			},
 			},
 			wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"},
 			wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"},
 		},
 		},
@@ -210,7 +210,7 @@ func TestClusterClientURLs(t *testing.T) {
 		// single peer with a single address
 		// single peer with a single address
 		{
 		{
 			mems: []Member{
 			mems: []Member{
-				{ID: 1, ClientURLs: []string{"192.0.2.1"}},
+				{ID: 1, ClientURLs: []string{"http://192.0.2.1"}},
 			},
 			},
 			wurls: []string{"http://192.0.2.1"},
 			wurls: []string{"http://192.0.2.1"},
 		},
 		},
@@ -218,7 +218,7 @@ func TestClusterClientURLs(t *testing.T) {
 		// single peer with a single address with a port
 		// single peer with a single address with a port
 		{
 		{
 			mems: []Member{
 			mems: []Member{
-				{ID: 1, ClientURLs: []string{"192.0.2.1:8001"}},
+				{ID: 1, ClientURLs: []string{"http://192.0.2.1:8001"}},
 			},
 			},
 			wurls: []string{"http://192.0.2.1:8001"},
 			wurls: []string{"http://192.0.2.1:8001"},
 		},
 		},
@@ -226,9 +226,9 @@ func TestClusterClientURLs(t *testing.T) {
 		// several members explicitly unsorted
 		// several members explicitly unsorted
 		{
 		{
 			mems: []Member{
 			mems: []Member{
-				{ID: 2, ClientURLs: []string{"192.0.2.3", "192.0.2.4"}},
-				{ID: 3, ClientURLs: []string{"192.0.2.5", "192.0.2.6"}},
-				{ID: 1, ClientURLs: []string{"192.0.2.1", "192.0.2.2"}},
+				{ID: 2, ClientURLs: []string{"http://192.0.2.3", "http://192.0.2.4"}},
+				{ID: 3, ClientURLs: []string{"http://192.0.2.5", "http://192.0.2.6"}},
+				{ID: 1, ClientURLs: []string{"http://192.0.2.1", "http://192.0.2.2"}},
 			},
 			},
 			wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"},
 			wurls: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"},
 		},
 		},

+ 3 - 3
etcdserver/etcdhttp/http_test.go

@@ -612,9 +612,9 @@ func TestV2MachinesEndpoint(t *testing.T) {
 func TestServeMachines(t *testing.T) {
 func TestServeMachines(t *testing.T) {
 	cluster := &fakeCluster{
 	cluster := &fakeCluster{
 		members: []etcdserver.Member{
 		members: []etcdserver.Member{
-			{ID: 0xBEEF0, ClientURLs: []string{"localhost:8080"}},
-			{ID: 0xBEEF1, ClientURLs: []string{"localhost:8081"}},
-			{ID: 0xBEEF2, ClientURLs: []string{"localhost:8082"}},
+			{ID: 0xBEEF0, ClientURLs: []string{"http://localhost:8080"}},
+			{ID: 0xBEEF1, ClientURLs: []string{"http://localhost:8081"}},
+			{ID: 0xBEEF2, ClientURLs: []string{"http://localhost:8082"}},
 		},
 		},
 	}
 	}
 
 

+ 7 - 1
etcdserver/member.go

@@ -3,9 +3,11 @@ package etcdserver
 import (
 import (
 	"crypto/sha1"
 	"crypto/sha1"
 	"encoding/binary"
 	"encoding/binary"
+	"fmt"
 	"path"
 	"path"
 	"sort"
 	"sort"
 	"strconv"
 	"strconv"
+	"time"
 )
 )
 
 
 const machineKVPrefix = "/_etcd/machines/"
 const machineKVPrefix = "/_etcd/machines/"
@@ -20,7 +22,7 @@ type Member struct {
 
 
 // newMember creates a Member without an ID and generates one based on the
 // newMember creates a Member without an ID and generates one based on the
 // name, peer URLs. This is used for bootstrapping.
 // name, peer URLs. This is used for bootstrapping.
-func newMember(name string, peerURLs []string) *Member {
+func newMember(name string, peerURLs []string, now *time.Time) *Member {
 	sort.Strings(peerURLs)
 	sort.Strings(peerURLs)
 	m := &Member{Name: name, PeerURLs: peerURLs}
 	m := &Member{Name: name, PeerURLs: peerURLs}
 
 
@@ -29,6 +31,10 @@ func newMember(name string, peerURLs []string) *Member {
 		b = append(b, []byte(p)...)
 		b = append(b, []byte(p)...)
 	}
 	}
 
 
+	if now != nil {
+		b = append(b, []byte(fmt.Sprintf("%d", now.Unix()))...)
+	}
+
 	hash := sha1.Sum(b)
 	hash := sha1.Sum(b)
 	m.ID = int64(binary.BigEndian.Uint64(hash[:8]))
 	m.ID = int64(binary.BigEndian.Uint64(hash[:8]))
 	if m.ID < 0 {
 	if m.ID < 0 {

+ 29 - 0
etcdserver/member_test.go

@@ -0,0 +1,29 @@
+package etcdserver
+
+import (
+	"testing"
+	"time"
+)
+
+func timeParse(value string) *time.Time {
+	t, err := time.Parse(time.RFC3339, value)
+	if err != nil {
+		panic(err)
+	}
+	return &t
+}
+
+func TestMemberTime(t *testing.T) {
+	tests := []struct {
+		mem *Member
+		id  int64
+	}{
+		{newMember("mem1", []string{"http://10.0.0.8:2379"}, nil), 7206348984215161146},
+		{newMember("mem1", []string{"http://10.0.0.1:2379"}, timeParse("1984-12-23T15:04:05Z")), 5483967913615174889},
+	}
+	for i, tt := range tests {
+		if tt.mem.ID != tt.id {
+			t.Errorf("#%d: mem.ID = %v, want %v", i, tt.mem.ID, tt.id)
+		}
+	}
+}

+ 33 - 22
main.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"fmt"
 	"log"
 	"log"
 	"net/http"
 	"net/http"
+	"net/url"
 	"os"
 	"os"
 	"path"
 	"path"
 	"strings"
 	"strings"
@@ -32,13 +33,15 @@ const (
 var (
 var (
 	name         = flag.String("name", "default", "Unique human-readable name for this node")
 	name         = flag.String("name", "default", "Unique human-readable name for this node")
 	timeout      = flag.Duration("timeout", 10*time.Second, "Request Timeout")
 	timeout      = flag.Duration("timeout", 10*time.Second, "Request Timeout")
-	paddr        = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')")
 	dir          = flag.String("data-dir", "", "Path to the data directory")
 	dir          = flag.String("data-dir", "", "Path to the data directory")
 	snapCount    = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
 	snapCount    = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
 	printVersion = flag.Bool("version", false, "Print the version and exit")
 	printVersion = flag.Bool("version", false, "Print the version and exit")
 
 
 	cluster   = &etcdserver.Cluster{}
 	cluster   = &etcdserver.Cluster{}
-	addrs     = &flagtypes.Addrs{}
+	lcurls    = &flagtypes.URLs{}
+	acurls    = &flagtypes.URLs{}
+	lpurls    = &flagtypes.URLs{}
+	apurls    = &flagtypes.URLs{}
 	cors      = &pkg.CORSInfo{}
 	cors      = &pkg.CORSInfo{}
 	proxyFlag = new(flagtypes.Proxy)
 	proxyFlag = new(flagtypes.Proxy)
 
 
@@ -66,11 +69,19 @@ var (
 
 
 func init() {
 func init() {
 	flag.Var(cluster, "bootstrap-config", "Initial cluster configuration for bootstrapping")
 	flag.Var(cluster, "bootstrap-config", "Initial cluster configuration for bootstrapping")
-	flag.Var(addrs, "bind-addr", "List of HTTP service addresses (e.g., '127.0.0.1:4001,10.0.0.1:8080')")
+	flag.Var(apurls, "advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster")
+	flag.Var(acurls, "advertise-client-urls", "List of this member's client URLs to advertise to the rest of the cluster")
+	flag.Var(lpurls, "listen-peer-urls", "List of this URLs to listen on for peer traffic")
+	flag.Var(lcurls, "listen-client-urls", "List of this URLs to listen on for client traffic")
 	flag.Var(cors, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
 	flag.Var(cors, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
 	flag.Var(proxyFlag, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(flagtypes.ProxyValues, ", ")))
 	flag.Var(proxyFlag, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(flagtypes.ProxyValues, ", ")))
-	cluster.Set("default=localhost:8080")
-	addrs.Set("127.0.0.1:4001")
+
+	cluster.Set("default=http://localhost:2380,default=http://localhost:7001")
+	lcurls.Set("http://localhost:2379,http://localhost:4001")
+	acurls.Set("http://localhost:2379,http://localhost:4001")
+	lpurls.Set("http://localhost:2380,http://localhost:7001")
+	apurls.Set("http://localhost:2380,http://localhost:7001")
+
 	proxyFlag.Set(flagtypes.ProxyValueOff)
 	proxyFlag.Set(flagtypes.ProxyValueOff)
 
 
 	flag.StringVar(&clientTLSInfo.CAFile, "ca-file", "", "Path to the client server TLS CA file.")
 	flag.StringVar(&clientTLSInfo.CAFile, "ca-file", "", "Path to the client server TLS CA file.")
@@ -202,27 +213,28 @@ func startEtcd() {
 	}
 	}
 	ph := etcdhttp.NewPeerHandler(s)
 	ph := etcdhttp.NewPeerHandler(s)
 
 
-	l, err := transport.NewListener(*paddr, peerTLSInfo)
-	if err != nil {
-		log.Fatal(err)
-	}
+	for _, u := range []url.URL(*lpurls) {
+		l, err := transport.NewListener(u.Host, peerTLSInfo)
+		if err != nil {
+			log.Fatal(err)
+		}
 
 
-	// Start the peer server in a goroutine
-	go func() {
-		log.Print("Listening for peers on ", *paddr)
-		log.Fatal(http.Serve(l, ph))
-	}()
+		// Start the peer server in a goroutine
+		go func() {
+			log.Print("Listening for peers on ", u.String())
+			log.Fatal(http.Serve(l, ph))
+		}()
+	}
 
 
 	// Start a client server goroutine for each listen address
 	// Start a client server goroutine for each listen address
-	for _, addr := range *addrs {
-		addr := addr
-		l, err := transport.NewListener(addr, clientTLSInfo)
+	for _, u := range []url.URL(*lcurls) {
+		l, err := transport.NewListener(u.Host, clientTLSInfo)
 		if err != nil {
 		if err != nil {
 			log.Fatal(err)
 			log.Fatal(err)
 		}
 		}
 
 
 		go func() {
 		go func() {
-			log.Print("Listening for client requests on ", addr)
+			log.Print("Listening for client requests on ", u.String())
 			log.Fatal(http.Serve(l, ch))
 			log.Fatal(http.Serve(l, ch))
 		}()
 		}()
 	}
 	}
@@ -250,15 +262,14 @@ func startProxy() {
 	}
 	}
 
 
 	// Start a proxy server goroutine for each listen address
 	// Start a proxy server goroutine for each listen address
-	for _, addr := range *addrs {
-		addr := addr
-		l, err := transport.NewListener(addr, clientTLSInfo)
+	for _, u := range []url.URL(*lcurls) {
+		l, err := transport.NewListener(u.Host, clientTLSInfo)
 		if err != nil {
 		if err != nil {
 			log.Fatal(err)
 			log.Fatal(err)
 		}
 		}
 
 
 		go func() {
 		go func() {
-			log.Print("Listening for client requests on ", addr)
+			log.Print("Listening for client requests on ", u.Host)
 			log.Fatal(http.Serve(l, ph))
 			log.Fatal(http.Serve(l, ph))
 		}()
 		}()
 	}
 	}

+ 46 - 0
pkg/flags/urls.go

@@ -0,0 +1,46 @@
+package flags
+
+import (
+	"errors"
+	"fmt"
+	"net/url"
+	"strings"
+)
+
+// URLs implements the flag.Value interface to allow users to define multiple
+// URLs on the command-line
+type URLs []url.URL
+
+// Set parses a command line set of URLs formatted like:
+// http://127.0.0.1:7001,http://10.1.1.2:80
+func (us *URLs) Set(s string) error {
+	strs := strings.Split(s, ",")
+	all := make([]url.URL, len(strs))
+	if len(all) == 0 {
+		return errors.New("no valid URLs given")
+	}
+	for _, in := range strs {
+		in = strings.TrimSpace(in)
+		u, err := url.Parse(in)
+		if err != nil {
+			return err
+		}
+		if u.Scheme != "http" && u.Scheme != "https" {
+			return fmt.Errorf("URL scheme must be http or https: %s", s)
+		}
+		if u.Path != "" {
+			return fmt.Errorf("URL must not contain a path: %s", s)
+		}
+		all = append(all, *u)
+	}
+	*us = all
+	return nil
+}
+
+func (us *URLs) String() string {
+	all := make([]string, len(*us))
+	for i, u := range *us {
+		all[i] = u.String()
+	}
+	return strings.Join(all, ",")
+}

+ 45 - 0
pkg/flags/urls_test.go

@@ -0,0 +1,45 @@
+package flags
+
+import (
+	"testing"
+)
+
+func TestValidateURLsBad(t *testing.T) {
+	tests := []string{
+		// bad IP specification
+		":4001",
+		"127.0:8080",
+		"123:456",
+		// bad port specification
+		"127.0.0.1:foo",
+		"127.0.0.1:",
+		// unix sockets not supported
+		"unix://",
+		"unix://tmp/etcd.sock",
+		// bad strings
+		"somewhere",
+		"234#$",
+		"file://foo/bar",
+		"http://hello/asdf",
+	}
+	for i, in := range tests {
+		u := URLs{}
+		if err := u.Set(in); err == nil {
+			t.Errorf(`#%d: unexpected nil error for in=%q`, i, in)
+		}
+	}
+}
+
+func TestValidateURLsGood(t *testing.T) {
+	tests := []string{
+		"https://1.2.3.4:8080",
+		"http://10.1.1.1:80",
+		"http://10.1.1.1",
+	}
+	for i, in := range tests {
+		u := URLs{}
+		if err := u.Set(in); err != nil {
+			t.Errorf("#%d: err=%v, want nil for in=%q", i, err, in)
+		}
+	}
+}