Browse Source

Merge pull request #1147 from philips/add-name

introduce cluster store
Brandon Philips 11 years ago
parent
commit
61dc89e7f3

+ 4 - 4
Procfile

@@ -1,5 +1,5 @@
 # Use goreman to run `go get github.com/mattn/goreman`
-etcd1: bin/etcd -id 0x1 -bind-addr 127.0.0.1:4001 -peer-bind-addr :7001 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
-etcd2: bin/etcd -id 0x2 -bind-addr 127.0.0.1:4002 -peer-bind-addr :7002 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
-etcd3: bin/etcd -id 0x3 -bind-addr 127.0.0.1:4003 -peer-bind-addr :7003 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
-proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -peers '0x1=localhost:7001&0x2=localhost:7002&0x3=localhost:7003'
+etcd1: bin/etcd -name node1 -bind-addr 127.0.0.1:4001 -peer-bind-addr :7001 -bootstrap-config 'node1=localhost:7001,node2=localhost:7002,node3=localhost:7003'
+etcd2: bin/etcd -name node2 -bind-addr 127.0.0.1:4002 -peer-bind-addr :7002 -bootstrap-config 'node1=localhost:7001,node2=localhost:7002,node3=localhost:7003'
+etcd3: bin/etcd -name node3 -bind-addr 127.0.0.1:4003 -peer-bind-addr :7003 -bootstrap-config 'node1=localhost:7001,node2=localhost:7002,node3=localhost:7003'
+#proxy: bin/etcd -proxy=on -bind-addr 127.0.0.1:8080 -peers 'localhost:7001,localhost:7002,localhost:7003'

+ 114 - 0
etcdserver/cluster.go

@@ -0,0 +1,114 @@
+package etcdserver
+
+import (
+	"fmt"
+	"math/rand"
+	"net/url"
+	"sort"
+	"strings"
+)
+
+// Cluster is a list of Members that belong to the same raft cluster
+type Cluster map[int64]*Member
+
+func (c Cluster) FindID(id int64) *Member {
+	return c[id]
+}
+
+func (c Cluster) FindName(name string) *Member {
+	for _, m := range c {
+		if m.Name == name {
+			return m
+		}
+	}
+
+	return nil
+}
+
+func (c Cluster) Add(m Member) error {
+	if c.FindID(m.ID) != nil {
+		return fmt.Errorf("Member exists with identical ID %v", m)
+	}
+	c[m.ID] = &m
+	return nil
+}
+
+func (c *Cluster) AddSlice(mems []Member) error {
+	for _, m := range mems {
+		err := c.Add(m)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+// Pick chooses a random address from a given Member's addresses, and returns it as
+// an addressible URI. If the given member does not exist, an empty string is returned.
+func (c Cluster) Pick(id int64) string {
+	if m := c.FindID(id); m != nil {
+		addrs := m.PeerURLs
+		if len(addrs) == 0 {
+			return ""
+		}
+		return addrs[rand.Intn(len(addrs))]
+	}
+
+	return ""
+}
+
+// Set parses command line sets of names to IPs formatted like:
+// mach0=1.1.1.1,mach0=2.2.2.2,mach0=1.1.1.1,mach1=2.2.2.2,mach1=3.3.3.3
+func (c *Cluster) Set(s string) error {
+	*c = Cluster{}
+	v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
+	if err != nil {
+		return err
+	}
+
+	for name, urls := range v {
+		if len(urls) == 0 || urls[0] == "" {
+			return fmt.Errorf("Empty URL given for %q", name)
+		}
+		m := newMember(name, urls)
+		err := c.Add(*m)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (c Cluster) String() string {
+	sl := []string{}
+	for _, m := range c {
+		for _, u := range m.PeerURLs {
+			sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
+		}
+	}
+	sort.Strings(sl)
+	return strings.Join(sl, ",")
+}
+
+func (c Cluster) IDs() []int64 {
+	var ids []int64
+	for _, m := range c {
+		ids = append(ids, m.ID)
+	}
+	return ids
+}
+
+// Endpoints returns a list of all peer addresses. Each address is prefixed
+// with the scheme (currently "http://"). The returned list is sorted in
+// ascending lexicographical order.
+func (c Cluster) Endpoints() []string {
+	endpoints := make([]string, 0)
+	for _, p := range c {
+		for _, addr := range p.PeerURLs {
+			endpoints = append(endpoints, addScheme(addr))
+		}
+	}
+	sort.Strings(endpoints)
+	return endpoints
+}

+ 141 - 0
etcdserver/cluster_store.go

@@ -0,0 +1,141 @@
+package etcdserver
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"log"
+	"net/http"
+
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/store"
+)
+
+const (
+	raftPrefix = "/raft"
+)
+
+type ClusterStore interface {
+	Get() Cluster
+	Delete(id int64)
+}
+
+type clusterStore struct {
+	Store store.Store
+}
+
+func NewClusterStore(st store.Store, c Cluster) ClusterStore {
+	cls := &clusterStore{Store: st}
+	for _, m := range c {
+		cls.add(*m)
+	}
+	return cls
+}
+
+// add puts a new Member into the store.
+// A Member with a matching id must not exist.
+func (s *clusterStore) add(m Member) {
+	b, err := json.Marshal(m)
+	if err != nil {
+		log.Panicf("marshal peer info error: %v", err)
+	}
+
+	if _, err := s.Store.Create(m.storeKey(), false, string(b), false, store.Permanent); err != nil {
+		log.Panicf("add member should never fail: %v", err)
+	}
+}
+
+// TODO(philips): keep the latest copy without going to the store to avoid the
+// lock here.
+func (s *clusterStore) Get() Cluster {
+	c := &Cluster{}
+	e, err := s.Store.Get(machineKVPrefix, true, false)
+	if err != nil {
+		log.Panicf("get member should never fail: %v", err)
+	}
+	for _, n := range e.Node.Nodes {
+		m := Member{}
+		if err := json.Unmarshal([]byte(*n.Value), &m); err != nil {
+			log.Panicf("unmarshal peer error: %v", err)
+		}
+		err := c.Add(m)
+		if err != nil {
+			log.Panicf("add member to cluster should never fail: %v", err)
+		}
+	}
+	return *c
+}
+
+// Delete removes a member from the store.
+// The given id MUST exist.
+func (s *clusterStore) Delete(id int64) {
+	p := s.Get().FindID(id).storeKey()
+	if _, err := s.Store.Delete(p, false, false); err != nil {
+		log.Panicf("delete peer should never fail: %v", err)
+	}
+}
+
+// addScheme adds the protocol prefix to a string; currently only HTTP
+// TODO: improve this when implementing TLS
+func addScheme(addr string) string {
+	return fmt.Sprintf("http://%s", addr)
+}
+
+func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
+	c := &http.Client{Transport: t}
+
+	scheme := "http"
+	if t.TLSClientConfig != nil {
+		scheme = "https"
+	}
+
+	return func(msgs []raftpb.Message) {
+		for _, m := range msgs {
+			// TODO: reuse go routines
+			// limit the number of outgoing connections for the same receiver
+			go send(c, scheme, cls, m)
+		}
+	}
+}
+
+func send(c *http.Client, scheme string, cls ClusterStore, m raftpb.Message) {
+	// TODO (xiangli): reasonable retry logic
+	for i := 0; i < 3; i++ {
+		addr := cls.Get().Pick(m.To)
+		if addr == "" {
+			// TODO: unknown peer id.. what do we do? I
+			// don't think his should ever happen, need to
+			// look into this further.
+			log.Printf("etcdhttp: no addr for %d", m.To)
+			return
+		}
+
+		url := fmt.Sprintf("%s://%s%s", scheme, addr, raftPrefix)
+
+		// TODO: don't block. we should be able to have 1000s
+		// of messages out at a time.
+		data, err := m.Marshal()
+		if err != nil {
+			log.Println("etcdhttp: dropping message:", err)
+			return // drop bad message
+		}
+		if httpPost(c, url, data) {
+			return // success
+		}
+		// TODO: backoff
+	}
+}
+
+func httpPost(c *http.Client, url string, data []byte) bool {
+	resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
+	if err != nil {
+		// TODO: log the error?
+		return false
+	}
+	resp.Body.Close()
+	if resp.StatusCode != http.StatusNoContent {
+		// TODO: log the error?
+		return false
+	}
+	return true
+}

+ 115 - 0
etcdserver/cluster_store_test.go

@@ -0,0 +1,115 @@
+package etcdserver
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/etcd/store"
+)
+
+func TestClusterStoreGet(t *testing.T) {
+	tests := []struct {
+		mems  []Member
+		wmems []Member
+	}{
+		{
+			[]Member{{Name: "node1", ID: 1}},
+			[]Member{{Name: "node1", ID: 1}},
+		},
+		{
+			[]Member{},
+			[]Member{},
+		},
+		{
+			[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
+			[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
+		},
+		{
+			[]Member{{Name: "node2", ID: 2}, {Name: "node1", ID: 1}},
+			[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
+		},
+	}
+	for i, tt := range tests {
+		c := Cluster{}
+		err := c.AddSlice(tt.mems)
+		if err != nil {
+			t.Error(err)
+		}
+
+		cs := NewClusterStore(&getAllStore{}, c)
+
+		if g := cs.Get(); !reflect.DeepEqual(g, c) {
+			t.Errorf("#%d: mems = %v, want %v", i, g, c)
+		}
+	}
+}
+
+func TestClusterStoreDelete(t *testing.T) {
+	st := &storeGetAllDeleteRecorder{}
+	c := Cluster{}
+	c.Add(Member{Name: "node", ID: 1})
+	cs := NewClusterStore(st, c)
+	cs.Delete(1)
+
+	wdeletes := []string{machineKVPrefix + "1"}
+	if !reflect.DeepEqual(st.deletes, wdeletes) {
+		t.Error("deletes = %v, want %v", st.deletes, wdeletes)
+	}
+}
+
+// simpleStore implements basic create and get.
+type simpleStore struct {
+	storeRecorder
+	st map[string]string
+}
+
+func (s *simpleStore) Create(key string, _ bool, value string, _ bool, _ time.Time) (*store.Event, error) {
+	if s.st == nil {
+		s.st = make(map[string]string)
+	}
+	s.st[key] = value
+	return nil, nil
+}
+func (s *simpleStore) Get(key string, _, _ bool) (*store.Event, error) {
+	val, ok := s.st[key]
+	if !ok {
+		return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)
+	}
+	ev := &store.Event{Node: &store.NodeExtern{Key: key, Value: stringp(val)}}
+	return ev, nil
+}
+
+// getAllStore inherits simpleStore, and makes Get return all keys.
+type getAllStore struct {
+	simpleStore
+}
+
+func (s *getAllStore) Get(_ string, _, _ bool) (*store.Event, error) {
+	nodes := make([]*store.NodeExtern, 0)
+	for k, v := range s.st {
+		nodes = append(nodes, &store.NodeExtern{Key: k, Value: stringp(v)})
+	}
+	return &store.Event{Node: &store.NodeExtern{Nodes: nodes}}, nil
+}
+
+type storeDeleteRecorder struct {
+	storeRecorder
+	deletes []string
+}
+
+func (s *storeDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) {
+	s.deletes = append(s.deletes, key)
+	return nil, nil
+}
+
+type storeGetAllDeleteRecorder struct {
+	getAllStore
+	deletes []string
+}
+
+func (s *storeGetAllDeleteRecorder) Delete(key string, _, _ bool) (*store.Event, error) {
+	s.deletes = append(s.deletes, key)
+	return nil, nil
+}

+ 143 - 0
etcdserver/cluster_test.go

@@ -0,0 +1,143 @@
+package etcdserver
+
+import (
+	"testing"
+)
+
+func TestClusterFind(t *testing.T) {
+	tests := []struct {
+		id    int64
+		name  string
+		mems  []Member
+		match bool
+	}{
+		{
+			1,
+			"node1",
+			[]Member{{Name: "node1", ID: 1}},
+			true,
+		},
+		{
+			2,
+			"foobar",
+			[]Member{},
+			false,
+		},
+		{
+			2,
+			"node2",
+			[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
+			true,
+		},
+		{
+			3,
+			"node3",
+			[]Member{{Name: "node1", ID: 1}, {Name: "node2", ID: 2}},
+			false,
+		},
+	}
+	for i, tt := range tests {
+		c := Cluster{}
+		c.AddSlice(tt.mems)
+
+		m := c.FindName(tt.name)
+		if m == nil && !tt.match {
+			continue
+		}
+		if m == nil && tt.match {
+			t.Errorf("#%d: expected match got empty", i)
+		}
+		if m.Name != tt.name && tt.match {
+			t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.name)
+		}
+	}
+
+	for i, tt := range tests {
+		c := Cluster{}
+		c.AddSlice(tt.mems)
+
+		m := c.FindID(tt.id)
+		if m == nil && !tt.match {
+			continue
+		}
+		if m == nil && tt.match {
+			t.Errorf("#%d: expected match got empty", i)
+		}
+		if m.ID != tt.id && tt.match {
+			t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.id)
+		}
+	}
+}
+
+func TestClusterSet(t *testing.T) {
+	tests := []struct {
+		f     string
+		mems  []Member
+		parse bool
+	}{
+		{
+			"mem1=10.0.0.1:2379,mem1=128.193.4.20:2379,mem2=10.0.0.2:2379,default=127.0.0.1:2379",
+			[]Member{
+				{ID: 3736794188555456841, Name: "mem1", PeerURLs: []string{"10.0.0.1:2379", "128.193.4.20:2379"}},
+				{ID: 5674507346857578431, Name: "mem2", PeerURLs: []string{"10.0.0.2:2379"}},
+				{ID: 2676999861503984872, Name: "default", PeerURLs: []string{"127.0.0.1:2379"}},
+			},
+			true,
+		},
+	}
+	for i, tt := range tests {
+		c := Cluster{}
+		err := c.AddSlice(tt.mems)
+		if err != nil {
+			t.Error(err)
+		}
+
+		g := Cluster{}
+		g.Set(tt.f)
+
+		if g.String() != c.String() {
+			t.Errorf("#%d: set = %v, want %v", i, g, c)
+		}
+	}
+}
+
+func TestClusterSetBad(t *testing.T) {
+	tests := []string{
+		"mem1=,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379",
+		"mem1,mem2=128.193.4.20:2379,mem3=10.0.0.2:2379",
+		// TODO(philips): anyone know of a 64 bit sha1 hash collision
+		// "06b2f82fd81b2c20=128.193.4.20:2379,02c60cb75083ceef=128.193.4.20:2379",
+	}
+	for i, tt := range tests {
+		g := Cluster{}
+		err := g.Set(tt)
+		if err == nil {
+			t.Errorf("#%d: set = %v, want err", i, tt)
+		}
+	}
+}
+
+func TestClusterAddBad(t *testing.T) {
+	tests := []struct {
+		mems []Member
+	}{
+		{
+			[]Member{
+				{ID: 1, Name: "mem1"},
+				{ID: 1, Name: "mem2"},
+			},
+		},
+	}
+
+	c := &Cluster{}
+	c.Add(Member{ID: 1, Name: "mem1"})
+
+	for i, tt := range tests {
+		for _, m := range tt.mems {
+			err := c.Add(m)
+			if err == nil {
+				t.Errorf("#%d: set = %v, want err", i, m)
+			}
+		}
+	}
+}

+ 10 - 10
etcdserver/etcdhttp/http.go

@@ -35,12 +35,12 @@ const (
 var errClosed = errors.New("etcdhttp: client closed connection")
 
 // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
-func NewClientHandler(server *etcdserver.EtcdServer, peers Peers, timeout time.Duration) http.Handler {
+func NewClientHandler(server *etcdserver.EtcdServer, clusterStore etcdserver.ClusterStore, timeout time.Duration) http.Handler {
 	sh := &serverHandler{
-		server:  server,
-		peers:   peers,
-		timer:   server,
-		timeout: timeout,
+		server:       server,
+		clusterStore: clusterStore,
+		timer:        server,
+		timeout:      timeout,
 	}
 	if sh.timeout == 0 {
 		sh.timeout = defaultServerTimeout
@@ -68,10 +68,10 @@ func NewPeerHandler(server etcdserver.Server) http.Handler {
 
 // serverHandler provides http.Handlers for etcd client and raft communication.
 type serverHandler struct {
-	timeout time.Duration
-	server  etcdserver.Server
-	timer   etcdserver.RaftTimer
-	peers   Peers
+	timeout      time.Duration
+	server       etcdserver.Server
+	timer        etcdserver.RaftTimer
+	clusterStore etcdserver.ClusterStore
 }
 
 func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
@@ -115,7 +115,7 @@ func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
 	if !allowMethod(w, r.Method, "GET", "HEAD") {
 		return
 	}
-	endpoints := h.peers.Endpoints()
+	endpoints := h.clusterStore.Get().Endpoints()
 	w.Write([]byte(strings.Join(endpoints, ", ")))
 }
 

+ 52 - 31
etcdserver/etcdhttp/http_test.go

@@ -589,7 +589,7 @@ func TestV2MachinesEndpoint(t *testing.T) {
 		{"POST", http.StatusMethodNotAllowed},
 	}
 
-	m := NewClientHandler(nil, Peers{}, time.Hour)
+	m := NewClientHandler(nil, &fakeCluster{}, time.Hour)
 	s := httptest.NewServer(m)
 	defer s.Close()
 
@@ -610,15 +610,20 @@ func TestV2MachinesEndpoint(t *testing.T) {
 }
 
 func TestServeMachines(t *testing.T) {
-	peers := Peers{}
-	peers.Set("0xBEEF0=localhost:8080&0xBEEF1=localhost:8081&0xBEEF2=localhost:8082")
+	cluster := &fakeCluster{
+		members: []etcdserver.Member{
+			{ID: 0xBEEF0, PeerURLs: []string{"localhost:8080"}},
+			{ID: 0xBEEF1, PeerURLs: []string{"localhost:8081"}},
+			{ID: 0xBEEF2, PeerURLs: []string{"localhost:8082"}},
+		},
+	}
 
 	writer := httptest.NewRecorder()
 	req, err := http.NewRequest("GET", "", nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	h := &serverHandler{peers: peers}
+	h := &serverHandler{clusterStore: cluster}
 	h.serveMachines(writer, req)
 	w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
 	if g := writer.Body.String(); g != w {
@@ -629,56 +634,64 @@ func TestServeMachines(t *testing.T) {
 	}
 }
 
-func TestPeersEndpoints(t *testing.T) {
+func TestClusterGetEndpoints(t *testing.T) {
 	tests := []struct {
-		peers     Peers
-		endpoints []string
+		clusterStore etcdserver.ClusterStore
+		endpoints    []string
 	}{
 		// single peer with a single address
 		{
-			peers: Peers(map[int64][]string{
-				1: []string{"192.0.2.1"},
-			}),
+			clusterStore: &fakeCluster{
+				members: []etcdserver.Member{
+					{ID: 1, PeerURLs: []string{"192.0.2.1"}},
+				},
+			},
 			endpoints: []string{"http://192.0.2.1"},
 		},
 
 		// single peer with a single address with a port
 		{
-			peers: Peers(map[int64][]string{
-				1: []string{"192.0.2.1:8001"},
-			}),
+			clusterStore: &fakeCluster{
+				members: []etcdserver.Member{
+					{ID: 1, PeerURLs: []string{"192.0.2.1:8001"}},
+				},
+			},
 			endpoints: []string{"http://192.0.2.1:8001"},
 		},
 
-		// several peers explicitly unsorted
+		// several members explicitly unsorted
 		{
-			peers: Peers(map[int64][]string{
-				2: []string{"192.0.2.3", "192.0.2.4"},
-				3: []string{"192.0.2.5", "192.0.2.6"},
-				1: []string{"192.0.2.1", "192.0.2.2"},
-			}),
+			clusterStore: &fakeCluster{
+				members: []etcdserver.Member{
+					{ID: 2, PeerURLs: []string{"192.0.2.3", "192.0.2.4"}},
+					{ID: 3, PeerURLs: []string{"192.0.2.5", "192.0.2.6"}},
+					{ID: 1, PeerURLs: []string{"192.0.2.1", "192.0.2.2"}},
+				},
+			},
 			endpoints: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"},
 		},
 
-		// no peers
+		// no members
 		{
-			peers:     Peers(map[int64][]string{}),
-			endpoints: []string{},
+			clusterStore: &fakeCluster{members: []etcdserver.Member{}},
+			endpoints:    []string{},
 		},
 
 		// peer with no endpoints
 		{
-			peers: Peers(map[int64][]string{
-				3: []string{},
-			}),
+			clusterStore: &fakeCluster{
+				members: []etcdserver.Member{
+					{ID: 3, PeerURLs: []string{}},
+				},
+			},
 			endpoints: []string{},
 		},
 	}
 
 	for i, tt := range tests {
-		endpoints := tt.peers.Endpoints()
+		endpoints := tt.clusterStore.Get().Endpoints()
 		if !reflect.DeepEqual(tt.endpoints, endpoints) {
-			t.Errorf("#%d: peers.Endpoints() incorrect: want=%#v got=%#v", i, tt.endpoints, endpoints)
+			t.Errorf("#%d: members.Endpoints() incorrect: want=%#v got=%#v", i, tt.endpoints, endpoints)
 		}
 	}
 }
@@ -868,7 +881,6 @@ func TestServeRaft(t *testing.T) {
 		h := &serverHandler{
 			timeout: time.Hour,
 			server:  &errServer{tt.serverErr},
-			peers:   nil,
 		}
 		rw := httptest.NewRecorder()
 		h.serveRaft(rw, req)
@@ -957,7 +969,6 @@ func TestBadServeKeys(t *testing.T) {
 		h := &serverHandler{
 			timeout: 0, // context times out immediately
 			server:  tt.server,
-			peers:   nil,
 		}
 		rw := httptest.NewRecorder()
 		h.serveKeys(rw, tt.req)
@@ -980,7 +991,6 @@ func TestServeKeysEvent(t *testing.T) {
 	h := &serverHandler{
 		timeout: time.Hour,
 		server:  server,
-		peers:   nil,
 		timer:   &dummyRaftTimer{},
 	}
 	rw := httptest.NewRecorder()
@@ -1019,7 +1029,6 @@ func TestServeKeysWatch(t *testing.T) {
 	h := &serverHandler{
 		timeout: time.Hour,
 		server:  server,
-		peers:   nil,
 		timer:   &dummyRaftTimer{},
 	}
 	go func() {
@@ -1295,3 +1304,15 @@ func TestHandleWatchStreaming(t *testing.T) {
 		t.Fatalf("timed out waiting for done")
 	}
 }
+
+type fakeCluster struct {
+	members []etcdserver.Member
+}
+
+func (c *fakeCluster) Get() etcdserver.Cluster {
+	cl := &etcdserver.Cluster{}
+	cl.AddSlice(c.members)
+	return *cl
+}
+
+func (c *fakeCluster) Delete(id int64) { return }

+ 0 - 157
etcdserver/etcdhttp/peers.go

@@ -1,157 +0,0 @@
-package etcdhttp
-
-import (
-	"bytes"
-	"fmt"
-	"log"
-	"math/rand"
-	"net/http"
-	"net/url"
-	"sort"
-	"strconv"
-
-	"github.com/coreos/etcd/raft/raftpb"
-)
-
-// Peers contains a mapping of unique IDs to a list of hostnames/IP addresses
-type Peers map[int64][]string
-
-// addScheme adds the protocol prefix to a string; currently only HTTP
-// TODO: improve this when implementing TLS
-func addScheme(addr string) string {
-	return fmt.Sprintf("http://%s", addr)
-}
-
-// Pick returns a random address from a given Peer's addresses. If the
-// given peer does not exist, an empty string is returned.
-func (ps Peers) Pick(id int64) string {
-	addrs := ps[id]
-	if len(addrs) == 0 {
-		return ""
-	}
-	return addrs[rand.Intn(len(addrs))]
-}
-
-// Set parses command line sets of names to IPs formatted like:
-// a=1.1.1.1&a=1.1.1.2&b=2.2.2.2
-func (ps *Peers) Set(s string) error {
-	m := make(map[int64][]string)
-	v, err := url.ParseQuery(s)
-	if err != nil {
-		return err
-	}
-	for k, v := range v {
-		id, err := strconv.ParseInt(k, 0, 64)
-		if err != nil {
-			return err
-		}
-		m[id] = v
-	}
-	*ps = m
-	return nil
-}
-
-func (ps *Peers) String() string {
-	v := url.Values{}
-	for k, vv := range *ps {
-		for i := range vv {
-			v.Add(strconv.FormatInt(k, 16), vv[i])
-		}
-	}
-	return v.Encode()
-}
-
-func (ps Peers) IDs() []int64 {
-	var ids []int64
-	for id := range ps {
-		ids = append(ids, id)
-	}
-	return ids
-}
-
-// Endpoints returns a list of all peer addresses. Each address is prefixed
-// with the scheme (currently "http://"). The returned list is sorted in
-// ascending lexicographical order.
-func (ps Peers) Endpoints() []string {
-	endpoints := make([]string, 0)
-	for _, addrs := range ps {
-		for _, addr := range addrs {
-			endpoints = append(endpoints, addScheme(addr))
-		}
-	}
-	sort.Strings(endpoints)
-
-	return endpoints
-}
-
-// Addrs returns a list of all peer addresses. The returned list is sorted
-// in ascending lexicographical order.
-func (ps Peers) Addrs() []string {
-	addrs := make([]string, 0)
-	for _, paddrs := range ps {
-		for _, paddr := range paddrs {
-			addrs = append(addrs, paddr)
-		}
-	}
-	sort.Strings(addrs)
-	return addrs
-}
-
-func Sender(t *http.Transport, p Peers) func(msgs []raftpb.Message) {
-	c := &http.Client{Transport: t}
-
-	scheme := "http"
-	if t.TLSClientConfig != nil {
-		scheme = "https"
-	}
-
-	return func(msgs []raftpb.Message) {
-		for _, m := range msgs {
-			// TODO: reuse go routines
-			// limit the number of outgoing connections for the same receiver
-			go send(c, scheme, p, m)
-		}
-	}
-}
-
-func send(c *http.Client, scheme string, p Peers, m raftpb.Message) {
-	// TODO (xiangli): reasonable retry logic
-	for i := 0; i < 3; i++ {
-		addr := p.Pick(m.To)
-		if addr == "" {
-			// TODO: unknown peer id.. what do we do? I
-			// don't think his should ever happen, need to
-			// look into this further.
-			log.Printf("etcdhttp: no addr for %d", m.To)
-			return
-		}
-
-		url := fmt.Sprintf("%s://%s%s", scheme, addr, raftPrefix)
-
-		// TODO: don't block. we should be able to have 1000s
-		// of messages out at a time.
-		data, err := m.Marshal()
-		if err != nil {
-			log.Println("etcdhttp: dropping message:", err)
-			return // drop bad message
-		}
-		if httpPost(c, url, data) {
-			return // success
-		}
-		// TODO: backoff
-	}
-}
-
-func httpPost(c *http.Client, url string, data []byte) bool {
-	resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
-	if err != nil {
-		// TODO: log the error?
-		return false
-	}
-	resp.Body.Close()
-	if resp.StatusCode != http.StatusNoContent {
-		// TODO: log the error?
-		return false
-	}
-	return true
-}

+ 0 - 248
etcdserver/etcdhttp/peers_test.go

@@ -1,248 +0,0 @@
-package etcdhttp
-
-import (
-	"net/http"
-	"net/http/httptest"
-	"reflect"
-	"sort"
-	"strings"
-	"testing"
-
-	"github.com/coreos/etcd/raft/raftpb"
-)
-
-func TestPeers(t *testing.T) {
-	tests := []struct {
-		in      string
-		wids    []int64
-		wep     []string
-		waddrs  []string
-		wstring string
-	}{
-		{
-			"1=1.1.1.1",
-			[]int64{1},
-			[]string{"http://1.1.1.1"},
-			[]string{"1.1.1.1"},
-			"1=1.1.1.1",
-		},
-		{
-			"2=2.2.2.2",
-			[]int64{2},
-			[]string{"http://2.2.2.2"},
-			[]string{"2.2.2.2"},
-			"2=2.2.2.2",
-		},
-		{
-			"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
-			[]int64{1, 2},
-			[]string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2"},
-			[]string{"1.1.1.1", "1.1.1.2", "2.2.2.2"},
-			"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
-		},
-		{
-			"3=3.3.3.3&4=4.4.4.4&1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
-			[]int64{1, 2, 3, 4},
-			[]string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2",
-				"http://3.3.3.3", "http://4.4.4.4"},
-			[]string{"1.1.1.1", "1.1.1.2", "2.2.2.2", "3.3.3.3", "4.4.4.4"},
-			"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2&3=3.3.3.3&4=4.4.4.4",
-		},
-	}
-	for i, tt := range tests {
-		p := &Peers{}
-		err := p.Set(tt.in)
-		if err != nil {
-			t.Errorf("#%d: err=%v, want nil", i, err)
-		}
-		ids := int64Slice(p.IDs())
-		sort.Sort(ids)
-		if !reflect.DeepEqual([]int64(ids), tt.wids) {
-			t.Errorf("#%d: IDs=%#v, want %#v", i, []int64(ids), tt.wids)
-		}
-		ep := p.Endpoints()
-		if !reflect.DeepEqual(ep, tt.wep) {
-			t.Errorf("#%d: Endpoints=%#v, want %#v", i, ep, tt.wep)
-		}
-		addrs := p.Addrs()
-		if !reflect.DeepEqual(addrs, tt.waddrs) {
-			t.Errorf("#%d: addrs=%#v, want %#v", i, ep, tt.waddrs)
-		}
-		s := p.String()
-		if s != tt.wstring {
-			t.Errorf("#%d: string=%q, want %q", i, s, tt.wstring)
-		}
-	}
-}
-
-type int64Slice []int64
-
-func (p int64Slice) Len() int           { return len(p) }
-func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
-func (p int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
-
-func TestPeersSetBad(t *testing.T) {
-	tests := []string{
-		// garbage URL
-		"asdf%%",
-		// non-int64 keys
-		"a=1.2.3.4",
-		"-1-23=1.2.3.4",
-	}
-	for i, tt := range tests {
-		p := &Peers{}
-		if err := p.Set(tt); err == nil {
-			t.Errorf("#%d: err=nil unexpectedly", i)
-		}
-	}
-}
-
-func TestPeersPick(t *testing.T) {
-	ps := &Peers{
-		1: []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"},
-		2: []string{"xyz"},
-		3: []string{},
-	}
-	ids := map[string]bool{
-		"abc": true,
-		"def": true,
-		"ghi": true,
-		"jkl": true,
-		"mno": true,
-		"pqr": true,
-		"stu": true,
-	}
-	for i := 0; i < 1000; i++ {
-		a := ps.Pick(1)
-		if _, ok := ids[a]; !ok {
-			t.Errorf("returned ID %q not in expected range!", a)
-			break
-		}
-	}
-	if b := ps.Pick(2); b != "xyz" {
-		t.Errorf("id=%q, want %q", b, "xyz")
-	}
-	if c := ps.Pick(3); c != "" {
-		t.Errorf("id=%q, want \"\"", c)
-	}
-}
-
-func TestHttpPost(t *testing.T) {
-	var tr *http.Request
-	tests := []struct {
-		h http.HandlerFunc
-		w bool
-	}{
-		{
-			http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-				tr = r
-				w.WriteHeader(http.StatusNoContent)
-			}),
-			true,
-		},
-		{
-			http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-				tr = r
-				w.WriteHeader(http.StatusNotFound)
-			}),
-			false,
-		},
-		{
-			http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-				tr = r
-				w.WriteHeader(http.StatusInternalServerError)
-			}),
-			false,
-		},
-	}
-	for i, tt := range tests {
-		ts := httptest.NewServer(tt.h)
-		if g := httpPost(http.DefaultClient, ts.URL, []byte("adsf")); g != tt.w {
-			t.Errorf("#%d: httpPost()=%t, want %t", i, g, tt.w)
-		}
-		if tr.Method != "POST" {
-			t.Errorf("#%d: Method=%q, want %q", i, tr.Method, "POST")
-		}
-		if ct := tr.Header.Get("Content-Type"); ct != "application/protobuf" {
-			t.Errorf("#%d: Content-Type=%q, want %q", i, ct, "application/protobuf")
-		}
-		tr = nil
-		ts.Close()
-	}
-
-	if httpPost(http.DefaultClient, "garbage url", []byte("data")) {
-		t.Errorf("httpPost with bad URL returned true unexpectedly!")
-	}
-}
-
-func TestSend(t *testing.T) {
-	var tr *http.Request
-	var rc int
-	h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		tr = r
-		w.WriteHeader(rc)
-	})
-	tests := []struct {
-		m    raftpb.Message
-		code int
-
-		ok bool
-	}{
-		{
-			// all good
-			raftpb.Message{
-				To:   42,
-				Type: 4,
-			},
-			http.StatusNoContent,
-			true,
-		},
-		{
-			// bad response from server should be silently ignored
-			raftpb.Message{
-				To:   42,
-				Type: 2,
-			},
-			http.StatusInternalServerError,
-			true,
-		},
-		{
-			// unknown destination!
-			raftpb.Message{
-				To:   3,
-				Type: 2,
-			},
-			0,
-			false,
-		},
-	}
-
-	for i, tt := range tests {
-		tr = nil
-		rc = tt.code
-		ts := httptest.NewServer(h)
-		ps := Peers{
-			42: []string{strings.TrimPrefix(ts.URL, "http://")},
-		}
-		send(http.DefaultClient, "http", ps, tt.m)
-
-		if !tt.ok {
-			if tr != nil {
-				t.Errorf("#%d: got request=%#v, want nil", i, tr)
-			}
-			ts.Close()
-			continue
-		}
-
-		if tr.Method != "POST" {
-			t.Errorf("#%d: Method=%q, want %q", i, tr.Method, "POST")
-		}
-		if ct := tr.Header.Get("Content-Type"); ct != "application/protobuf" {
-			t.Errorf("#%d: Content-Type=%q, want %q", i, ct, "application/protobuf")
-		}
-		if tr.URL.String() != "/raft" {
-			t.Errorf("#%d: URL=%q, want %q", i, tr.URL.String(), "/raft")
-		}
-		ts.Close()
-	}
-}

+ 43 - 0
etcdserver/member.go

@@ -0,0 +1,43 @@
+package etcdserver
+
+import (
+	"crypto/sha1"
+	"encoding/binary"
+	"path"
+	"sort"
+	"strconv"
+)
+
+const machineKVPrefix = "/_etcd/machines/"
+
+type Member struct {
+	ID   int64
+	Name string
+	// TODO(philips): ensure these are URLs
+	PeerURLs   []string
+	ClientURLs []string
+}
+
+// newMember creates a Member without an ID and generates one based on the
+// name, peer URLs. This is used for bootstrapping.
+func newMember(name string, peerURLs []string) *Member {
+	sort.Strings(peerURLs)
+	m := &Member{Name: name, PeerURLs: peerURLs}
+
+	b := []byte(m.Name)
+	for _, p := range m.PeerURLs {
+		b = append(b, []byte(p)...)
+	}
+
+	hash := sha1.Sum(b)
+	m.ID = int64(binary.BigEndian.Uint64(hash[:8]))
+	if m.ID < 0 {
+		m.ID = m.ID * -1
+	}
+
+	return m
+}
+
+func (m Member) storeKey() string {
+	return path.Join(machineKVPrefix, strconv.FormatUint(uint64(m.ID), 16))
+}

+ 7 - 3
etcdserver/server.go

@@ -80,7 +80,7 @@ type EtcdServer struct {
 	Node  raft.Node
 	Store store.Store
 
-	// Send specifies the send function for sending msgs to peers. Send
+	// Send specifies the send function for sending msgs to members. Send
 	// MUST NOT block. It is okay to drop messages, since clients should
 	// timeout and reissue their messages.  If Send is nil, server will
 	// panic.
@@ -94,8 +94,9 @@ type EtcdServer struct {
 	SnapCount int64 // number of entries to trigger a snapshot
 
 	// Cache of the latest raft index and raft term the server has seen
-	raftIndex int64
-	raftTerm  int64
+	raftIndex    int64
+	raftTerm     int64
+	ClusterStore ClusterStore
 }
 
 // Start prepares and starts server in a new goroutine. It is no longer safe to
@@ -107,6 +108,8 @@ func (s *EtcdServer) Start() {
 	}
 	s.w = wait.New()
 	s.done = make(chan struct{})
+	// TODO: if this is an empty log, writes all peer infos
+	// into the first entry
 	go s.run()
 }
 
@@ -130,6 +133,7 @@ func (s *EtcdServer) run() {
 			// TODO(bmizerany): do this in the background, but take
 			// care to apply entries in a single goroutine, and not
 			// race them.
+			// TODO: apply configuration change into ClusterStore.
 			for _, e := range rd.CommittedEntries {
 				switch e.Type {
 				case raftpb.EntryNormal:

+ 3 - 3
etcdserver/server_test.go

@@ -382,14 +382,14 @@ func testServer(t *testing.T, ns int64) {
 		}
 	}
 
-	peers := make([]int64, ns)
+	members := make([]int64, ns)
 	for i := int64(0); i < ns; i++ {
-		peers[i] = i + 1
+		members[i] = i + 1
 	}
 
 	for i := int64(0); i < ns; i++ {
 		id := i + 1
-		n := raft.StartNode(id, peers, 10, 1)
+		n := raft.StartNode(id, members, 10, 1)
 		tk := time.NewTicker(10 * time.Millisecond)
 		defer tk.Stop()
 		srv := &EtcdServer{

+ 22 - 21
main.go

@@ -36,14 +36,14 @@ const (
 )
 
 var (
-	fid          = flag.String("id", "0x1", "ID of this server")
+	name         = flag.String("name", "default", "Unique human-readable name for this node")
 	timeout      = flag.Duration("timeout", 10*time.Second, "Request Timeout")
 	paddr        = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')")
 	dir          = flag.String("data-dir", "", "Path to the data directory")
 	snapCount    = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
 	printVersion = flag.Bool("version", false, "Print the version and exit")
 
-	peers     = &etcdhttp.Peers{}
+	cluster   = &etcdserver.Cluster{}
 	addrs     = &Addrs{}
 	cors      = &pkg.CORSInfo{}
 	proxyFlag = new(ProxyFlag)
@@ -78,11 +78,11 @@ var (
 )
 
 func init() {
-	flag.Var(peers, "peers", "your peers")
+	flag.Var(cluster, "bootstrap-config", "Initial cluster configuration for bootstrapping")
 	flag.Var(addrs, "bind-addr", "List of HTTP service addresses (e.g., '127.0.0.1:4001,10.0.0.1:8080')")
 	flag.Var(cors, "cors", "Comma-separated white list of origins for CORS (cross-origin resource sharing).")
 	flag.Var(proxyFlag, "proxy", fmt.Sprintf("Valid values include %s", strings.Join(proxyFlagValues, ", ")))
-	peers.Set("0x1=localhost:8080")
+	cluster.Set("default=localhost:8080")
 	addrs.Set("127.0.0.1:4001")
 	proxyFlag.Set(proxyFlagValueOff)
 
@@ -122,16 +122,13 @@ func main() {
 
 // startEtcd launches the etcd server and HTTP handlers for client/server communication.
 func startEtcd() {
-	id, err := strconv.ParseInt(*fid, 0, 64)
-	if err != nil {
-		log.Fatal(err)
-	}
-	if id == raft.None {
-		log.Fatalf("etcd: cannot use None(%d) as etcdserver id", raft.None)
+	self := cluster.FindName(*name)
+	if self == nil {
+		log.Fatalf("etcd: no member with name=%q exists", *name)
 	}
 
-	if peers.Pick(id) == "" {
-		log.Fatalf("%#x=<addr> must be specified in peers", id)
+	if self.ID == raft.None {
+		log.Fatalf("etcd: cannot use None(%d) as member id", raft.None)
 	}
 
 	if *snapCount <= 0 {
@@ -139,7 +136,7 @@ func startEtcd() {
 	}
 
 	if *dir == "" {
-		*dir = fmt.Sprintf("%v_etcd_data", *fid)
+		*dir = fmt.Sprintf("%v_etcd_data", self.ID)
 		log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir)
 	}
 	if err := os.MkdirAll(*dir, privateDirMode); err != nil {
@@ -154,6 +151,7 @@ func startEtcd() {
 	waldir := path.Join(*dir, "wal")
 	var w *wal.WAL
 	var n raft.Node
+	var err error
 	st := store.New()
 
 	if !wal.Exist(waldir) {
@@ -161,7 +159,7 @@ func startEtcd() {
 		if err != nil {
 			log.Fatal(err)
 		}
-		n = raft.StartNode(id, peers.IDs(), 10, 1)
+		n = raft.StartNode(self.ID, cluster.IDs(), 10, 1)
 	} else {
 		var index int64
 		snapshot, err := snapshotter.Load()
@@ -186,7 +184,7 @@ func startEtcd() {
 		if wid != 0 {
 			log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
 		}
-		n = raft.RestartNode(id, peers.IDs(), 10, 1, snapshot, st, ents)
+		n = raft.RestartNode(self.ID, cluster.IDs(), 10, 1, snapshot, st, ents)
 	}
 
 	pt, err := transport.NewTransport(peerTLSInfo)
@@ -194,6 +192,8 @@ func startEtcd() {
 		log.Fatal(err)
 	}
 
+	cls := etcdserver.NewClusterStore(st, *cluster)
+
 	s := &etcdserver.EtcdServer{
 		Store: st,
 		Node:  n,
@@ -201,15 +201,16 @@ func startEtcd() {
 			*wal.WAL
 			*snap.Snapshotter
 		}{w, snapshotter},
-		Send:       etcdhttp.Sender(pt, *peers),
-		Ticker:     time.Tick(100 * time.Millisecond),
-		SyncTicker: time.Tick(500 * time.Millisecond),
-		SnapCount:  *snapCount,
+		Send:         etcdserver.Sender(pt, cls),
+		Ticker:       time.Tick(100 * time.Millisecond),
+		SyncTicker:   time.Tick(500 * time.Millisecond),
+		SnapCount:    *snapCount,
+		ClusterStore: cls,
 	}
 	s.Start()
 
 	ch := &pkg.CORSHandler{
-		Handler: etcdhttp.NewClientHandler(s, *peers, *timeout),
+		Handler: etcdhttp.NewClientHandler(s, cls, *timeout),
 		Info:    cors,
 	}
 	ph := etcdhttp.NewPeerHandler(s)
@@ -247,7 +248,7 @@ func startProxy() {
 		log.Fatal(err)
 	}
 
-	ph, err := proxy.NewHandler(pt, (*peers).Addrs())
+	ph, err := proxy.NewHandler(pt, (*cluster).Endpoints())
 	if err != nil {
 		log.Fatal(err)
 	}