Browse Source

Merge pull request #1361 from unihorn/182

etcdserver: refactor cluster and clusterStore
Yicheng Qin 11 years ago
parent
commit
4089475c90

+ 173 - 82
etcdserver/cluster.go

@@ -19,131 +19,131 @@ package etcdserver
 import (
 	"crypto/sha1"
 	"encoding/binary"
+	"encoding/json"
 	"fmt"
-	"math/rand"
+	"log"
 	"net/url"
+	"path"
 	"sort"
 	"strings"
 
+	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/pkg/flags"
 	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/store"
 )
 
+const (
+	raftAttributesSuffix = "raftAttributes"
+	attributesSuffix     = "attributes"
+)
+
+type ClusterInfo interface {
+	ID() uint64
+	ClientURLs() []string
+	Members() map[uint64]*Member
+	Member(id uint64) *Member
+}
+
 // Cluster is a list of Members that belong to the same raft cluster
 type Cluster struct {
 	id      uint64
 	name    string
 	members map[uint64]*Member
+	// removed contains the ids of removed members in the cluster.
+	// removed id cannot be reused.
+	removed map[uint64]bool
+	store   store.Store
 }
 
-func NewCluster(clusterName string) *Cluster {
-	return &Cluster{name: clusterName, members: make(map[uint64]*Member)}
-}
+// NewClusterFromString returns Cluster through given clusterName and parsing
+// members from a sets of names to IPs discovery formatted like:
+// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach1=http://3.3.3.3,mach2=http://4.4.4.4
+func NewClusterFromString(name string, cluster string) (*Cluster, error) {
+	c := newCluster(name)
 
-func (c Cluster) FindName(name string) *Member {
-	for _, m := range c.members {
-		if m.Name == name {
-			return m
+	v, err := url.ParseQuery(strings.Replace(cluster, ",", "&", -1))
+	if err != nil {
+		return nil, err
+	}
+	for name, urls := range v {
+		if len(urls) == 0 || urls[0] == "" {
+			return nil, fmt.Errorf("Empty URL given for %q", name)
+		}
+		m := NewMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), c.name, nil)
+		if _, ok := c.members[m.ID]; ok {
+			return nil, fmt.Errorf("Member exists with identical ID %v", m)
 		}
+		c.members[m.ID] = m
 	}
-	return nil
+	c.genID()
+	return c, nil
 }
 
-func (c Cluster) FindID(id uint64) *Member {
-	return c.members[id]
-}
+func NewClusterFromStore(name string, st store.Store) *Cluster {
+	c := newCluster(name)
+	c.store = st
 
-func (c Cluster) Add(m Member) error {
-	if c.FindID(m.ID) != nil {
-		return fmt.Errorf("Member exists with identical ID %v", m)
+	e, err := c.store.Get(storeMembersPrefix, true, true)
+	if err != nil {
+		if isKeyNotFound(err) {
+			return c
+		}
+		log.Panicf("get storeMembers should never fail: %v", err)
 	}
-	c.members[m.ID] = &m
-	return nil
-}
-
-func (c *Cluster) AddSlice(mems []Member) error {
-	for _, m := range mems {
-		err := c.Add(m)
+	for _, n := range e.Node.Nodes {
+		m, err := nodeToMember(n)
 		if err != nil {
-			return err
+			log.Panicf("nodeToMember should never fail: %v", err)
 		}
+		c.members[m.ID] = m
 	}
 
-	return nil
-}
-
-// Pick chooses a random address from a given Member's addresses, and returns it as
-// an addressible URI. If the given member does not exist, an empty string is returned.
-func (c Cluster) Pick(id uint64) string {
-	if m := c.FindID(id); m != nil {
-		urls := m.PeerURLs
-		if len(urls) == 0 {
-			return ""
+	e, err = c.store.Get(storeRemovedMembersPrefix, true, true)
+	if err != nil {
+		if isKeyNotFound(err) {
+			return c
 		}
-		return urls[rand.Intn(len(urls))]
+		log.Panicf("get storeRemovedMembers should never fail: %v", err)
 	}
-
-	return ""
-}
-
-// SetMembersFromString parses a sets of names to IPs either from the command line or discovery formatted like:
-// mach0=http://1.1.1.1,mach0=http://2.2.2.2,mach0=http://1.1.1.1,mach1=http://2.2.2.2,mach1=http://3.3.3.3
-func (c *Cluster) SetMembersFromString(s string) error {
-	c.members = make(map[uint64]*Member)
-	v, err := url.ParseQuery(strings.Replace(s, ",", "&", -1))
-	if err != nil {
-		return err
+	for _, n := range e.Node.Nodes {
+		c.removed[parseMemberID(n.Key)] = true
 	}
 
-	for name, urls := range v {
-		if len(urls) == 0 || urls[0] == "" {
-			return fmt.Errorf("Empty URL given for %q", name)
-		}
-
-		m := NewMember(name, types.URLs(*flags.NewURLsValue(strings.Join(urls, ","))), c.name, nil)
-		err := c.Add(*m)
-		if err != nil {
-			return err
-		}
-	}
-	return nil
+	return c
 }
 
-func (c *Cluster) AddMemberFromURLs(name string, urls types.URLs) (*Member, error) {
-	m := NewMember(name, urls, c.name, nil)
-	err := c.Add(*m)
-	if err != nil {
-		return nil, err
+func newCluster(name string) *Cluster {
+	return &Cluster{
+		name:    name,
+		members: make(map[uint64]*Member),
+		removed: make(map[uint64]bool),
 	}
-	return m, nil
 }
 
-func (c *Cluster) GenID(salt []byte) {
-	mIDs := c.MemberIDs()
-	b := make([]byte, 8*len(mIDs))
-	for i, id := range mIDs {
-		binary.BigEndian.PutUint64(b[8*i:], id)
-	}
-	b = append(b, salt...)
-	hash := sha1.Sum(b)
-	c.id = binary.BigEndian.Uint64(hash[:8])
+func (c Cluster) ID() uint64 { return c.id }
+
+func (c Cluster) Members() map[uint64]*Member { return c.members }
+
+func (c *Cluster) Member(id uint64) *Member {
+	return c.members[id]
 }
 
-func (c Cluster) String() string {
-	sl := []string{}
+// MemberByName returns a Member with the given name if exists.
+// If more than one member has the given name, it will panic.
+func (c *Cluster) MemberByName(name string) *Member {
+	var memb *Member
 	for _, m := range c.members {
-		for _, u := range m.PeerURLs {
-			sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
+		if m.Name == name {
+			if memb != nil {
+				panic("two members with the given name exist in the cluster")
+			}
+			memb = m
 		}
 	}
-	sort.Strings(sl)
-	return strings.Join(sl, ",")
+	return memb
 }
 
-func (c Cluster) ID() uint64 { return c.id }
-
-func (c Cluster) Members() map[uint64]*Member { return c.members }
-
 func (c Cluster) MemberIDs() []uint64 {
 	var ids []uint64
 	for _, m := range c.members {
@@ -153,6 +153,10 @@ func (c Cluster) MemberIDs() []uint64 {
 	return ids
 }
 
+func (c *Cluster) IsIDRemoved(id uint64) bool {
+	return c.removed[id]
+}
+
 // PeerURLs returns a list of all peer addresses. Each address is prefixed
 // with the scheme (currently "http://"). The returned list is sorted in
 // ascending lexicographical order.
@@ -180,3 +184,90 @@ func (c Cluster) ClientURLs() []string {
 	sort.Strings(urls)
 	return urls
 }
+
+func (c Cluster) String() string {
+	sl := []string{}
+	for _, m := range c.members {
+		for _, u := range m.PeerURLs {
+			sl = append(sl, fmt.Sprintf("%s=%s", m.Name, u))
+		}
+	}
+	sort.Strings(sl)
+	return strings.Join(sl, ",")
+}
+
+func (c *Cluster) genID() {
+	mIDs := c.MemberIDs()
+	b := make([]byte, 8*len(mIDs))
+	for i, id := range mIDs {
+		binary.BigEndian.PutUint64(b[8*i:], id)
+	}
+	hash := sha1.Sum(b)
+	c.id = binary.BigEndian.Uint64(hash[:8])
+}
+
+func (c *Cluster) SetID(id uint64) { c.id = id }
+
+func (c *Cluster) SetStore(st store.Store) { c.store = st }
+
+// AddMember puts a new Member into the store.
+// A Member with a matching id must not exist.
+func (c *Cluster) AddMember(m *Member) {
+	b, err := json.Marshal(m.RaftAttributes)
+	if err != nil {
+		log.Panicf("marshal error: %v", err)
+	}
+	p := path.Join(memberStoreKey(m.ID), raftAttributesSuffix)
+	if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
+		log.Panicf("add raftAttributes should never fail: %v", err)
+	}
+	b, err = json.Marshal(m.Attributes)
+	if err != nil {
+		log.Panicf("marshal error: %v", err)
+	}
+	p = path.Join(memberStoreKey(m.ID), attributesSuffix)
+	if _, err := c.store.Create(p, false, string(b), false, store.Permanent); err != nil {
+		log.Panicf("add attributes should never fail: %v", err)
+	}
+	c.members[m.ID] = m
+}
+
+// RemoveMember removes a member from the store.
+// The given id MUST exist, or the function panics.
+func (c *Cluster) RemoveMember(id uint64) {
+	if _, err := c.store.Delete(memberStoreKey(id), true, true); err != nil {
+		log.Panicf("delete peer should never fail: %v", err)
+	}
+	delete(c.members, id)
+	if _, err := c.store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
+		log.Panicf("creating RemovedMember should never fail: %v", err)
+	}
+	c.removed[id] = true
+}
+
+// nodeToMember builds member through a store node.
+// the child nodes of the given node should be sorted by key.
+func nodeToMember(n *store.NodeExtern) (*Member, error) {
+	m := &Member{ID: parseMemberID(n.Key)}
+	if len(n.Nodes) != 2 {
+		return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes))
+	}
+	if w := path.Join(n.Key, attributesSuffix); n.Nodes[0].Key != w {
+		return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w)
+	}
+	if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil {
+		return m, fmt.Errorf("unmarshal attributes error: %v", err)
+	}
+	if w := path.Join(n.Key, raftAttributesSuffix); n.Nodes[1].Key != w {
+		return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w)
+	}
+	if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil {
+		return m, fmt.Errorf("unmarshal raftAttributes error: %v", err)
+	}
+	return m, nil
+}
+
+func isKeyNotFound(err error) bool {
+	e, ok := err.(*etcdErr.Error)
+	return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
+}

+ 0 - 240
etcdserver/cluster_store.go

@@ -1,240 +0,0 @@
-/*
-   Copyright 2014 CoreOS, Inc.
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package etcdserver
-
-import (
-	"bytes"
-	"encoding/json"
-	"fmt"
-	"log"
-	"net/http"
-	"strconv"
-	"time"
-
-	etcdErr "github.com/coreos/etcd/error"
-
-	"github.com/coreos/etcd/etcdserver/stats"
-	"github.com/coreos/etcd/raft/raftpb"
-	"github.com/coreos/etcd/store"
-)
-
-const (
-	raftPrefix = "/raft"
-
-	raftAttributesSuffix = "/raftAttributes"
-	attributesSuffix     = "/attributes"
-)
-
-type ClusterStore interface {
-	Add(m Member)
-	Get() Cluster
-	Remove(id uint64)
-	IsRemoved(id uint64) bool
-}
-
-type clusterStore struct {
-	Store store.Store
-	// TODO: write the id into the actual store?
-	// TODO: save the id as string?
-	id          uint64
-	clusterName string
-}
-
-// Add puts a new Member into the store.
-// A Member with a matching id must not exist.
-func (s *clusterStore) Add(m Member) {
-	b, err := json.Marshal(m.RaftAttributes)
-	if err != nil {
-		log.Panicf("marshal error: %v", err)
-	}
-	if _, err := s.Store.Create(memberStoreKey(m.ID)+raftAttributesSuffix, false, string(b), false, store.Permanent); err != nil {
-		log.Panicf("add raftAttributes should never fail: %v", err)
-	}
-
-	b, err = json.Marshal(m.Attributes)
-	if err != nil {
-		log.Panicf("marshal error: %v", err)
-	}
-	if _, err := s.Store.Create(memberStoreKey(m.ID)+attributesSuffix, false, string(b), false, store.Permanent); err != nil {
-		log.Panicf("add attributes should never fail: %v", err)
-	}
-}
-
-// TODO(philips): keep the latest copy without going to the store to avoid the
-// lock here.
-func (s *clusterStore) Get() Cluster {
-	c := NewCluster(s.clusterName)
-	c.id = s.id
-	e, err := s.Store.Get(storeMembersPrefix, true, true)
-	if err != nil {
-		if isKeyNotFound(err) {
-			return *c
-		}
-		log.Panicf("get member should never fail: %v", err)
-	}
-	for _, n := range e.Node.Nodes {
-		m, err := nodeToMember(n)
-		if err != nil {
-			log.Panicf("unexpected nodeToMember error: %v", err)
-		}
-		if err := c.Add(m); err != nil {
-			log.Panicf("add member to cluster should never fail: %v", err)
-		}
-	}
-	return *c
-}
-
-// nodeToMember builds member through a store node.
-// the child nodes of the given node should be sorted by key.
-func nodeToMember(n *store.NodeExtern) (Member, error) {
-	m := Member{ID: parseMemberID(n.Key)}
-	if len(n.Nodes) != 2 {
-		return m, fmt.Errorf("len(nodes) = %d, want 2", len(n.Nodes))
-	}
-	if w := n.Key + attributesSuffix; n.Nodes[0].Key != w {
-		return m, fmt.Errorf("key = %v, want %v", n.Nodes[0].Key, w)
-	}
-	if err := json.Unmarshal([]byte(*n.Nodes[0].Value), &m.Attributes); err != nil {
-		return m, fmt.Errorf("unmarshal attributes error: %v", err)
-	}
-	if w := n.Key + raftAttributesSuffix; n.Nodes[1].Key != w {
-		return m, fmt.Errorf("key = %v, want %v", n.Nodes[1].Key, w)
-	}
-	if err := json.Unmarshal([]byte(*n.Nodes[1].Value), &m.RaftAttributes); err != nil {
-		return m, fmt.Errorf("unmarshal raftAttributes error: %v", err)
-	}
-	return m, nil
-}
-
-// Remove removes a member from the store.
-// The given id MUST exist.
-func (s *clusterStore) Remove(id uint64) {
-	if _, err := s.Store.Delete(memberStoreKey(id), true, true); err != nil {
-		log.Panicf("delete peer should never fail: %v", err)
-	}
-	if _, err := s.Store.Create(removedMemberStoreKey(id), false, "", false, store.Permanent); err != nil {
-		log.Panicf("creating RemovedMember should never fail: %v", err)
-	}
-}
-
-func (s *clusterStore) IsRemoved(id uint64) bool {
-	_, err := s.Store.Get(removedMemberStoreKey(id), false, false)
-	switch {
-	case err == nil:
-		return true
-	case isKeyNotFound(err):
-		return false
-	default:
-		log.Panicf("unexpected error when getting removed member %x: %v", id, err)
-		return false
-	}
-}
-
-// Sender creates the default production sender used to transport raft messages
-// in the cluster. The returned sender will update the given ServerStats and
-// LeaderStats appropriately.
-func Sender(t *http.Transport, cls ClusterStore, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
-	c := &http.Client{Transport: t}
-
-	return func(msgs []raftpb.Message) {
-		for _, m := range msgs {
-			// TODO: reuse go routines
-			// limit the number of outgoing connections for the same receiver
-			go send(c, cls, m, ss, ls)
-		}
-	}
-}
-
-// send uses the given client to send a message to a member in the given
-// ClusterStore, retrying up to 3 times for each message. The given
-// ServerStats and LeaderStats are updated appropriately
-func send(c *http.Client, cls ClusterStore, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
-	cid := cls.Get().ID()
-	// TODO (xiangli): reasonable retry logic
-	for i := 0; i < 3; i++ {
-		u := cls.Get().Pick(m.To)
-		if u == "" {
-			// TODO: unknown peer id.. what do we do? I
-			// don't think his should ever happen, need to
-			// look into this further.
-			log.Printf("etcdhttp: no addr for %d", m.To)
-			return
-		}
-		u = fmt.Sprintf("%s%s", u, raftPrefix)
-
-		// TODO: don't block. we should be able to have 1000s
-		// of messages out at a time.
-		data, err := m.Marshal()
-		if err != nil {
-			log.Println("etcdhttp: dropping message:", err)
-			return // drop bad message
-		}
-		if m.Type == raftpb.MsgApp {
-			ss.SendAppendReq(len(data))
-		}
-		to := idAsHex(m.To)
-		fs := ls.Follower(to)
-
-		start := time.Now()
-		sent := httpPost(c, u, cid, data)
-		end := time.Now()
-		if sent {
-			fs.Succ(end.Sub(start))
-			return
-		}
-		fs.Fail()
-		// TODO: backoff
-	}
-}
-
-// httpPost POSTs a data payload to a url using the given client. Returns true
-// if the POST succeeds, false on any failure.
-func httpPost(c *http.Client, url string, cid uint64, data []byte) bool {
-	req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
-	if err != nil {
-		// TODO: log the error?
-		return false
-	}
-	req.Header.Set("Content-Type", "application/protobuf")
-	req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(cid, 16))
-	resp, err := c.Do(req)
-	if err != nil {
-		// TODO: log the error?
-		return false
-	}
-	resp.Body.Close()
-
-	switch resp.StatusCode {
-	case http.StatusPreconditionFailed:
-		// TODO: shutdown the etcdserver gracefully?
-		log.Fatalf("etcd: conflicting cluster ID with the target cluster (%s != %s). Exiting.", resp.Header.Get("X-Etcd-Cluster-ID"), strconv.FormatUint(cid, 16))
-		return false
-	case http.StatusForbidden:
-		// TODO: stop the server
-		log.Fatalf("etcd: this member has been permanently removed from the cluster. Exiting.")
-		return false
-	case http.StatusNoContent:
-		return true
-	default:
-		return false
-	}
-}
-
-func isKeyNotFound(err error) bool {
-	e, ok := err.(*etcdErr.Error)
-	return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
-}

+ 0 - 239
etcdserver/cluster_store_test.go

@@ -1,239 +0,0 @@
-/*
-   Copyright 2014 CoreOS, Inc.
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-*/
-
-package etcdserver
-
-import (
-	"path"
-	"reflect"
-	"testing"
-	"time"
-
-	etcdErr "github.com/coreos/etcd/error"
-	"github.com/coreos/etcd/store"
-)
-
-func TestClusterStoreAdd(t *testing.T) {
-	st := &storeRecorder{}
-	ps := &clusterStore{Store: st}
-	ps.Add(newTestMember(1, nil, "node1", nil))
-
-	wactions := []action{
-		{
-			name: "Create",
-			params: []interface{}{
-				path.Join(storeMembersPrefix, "1", "raftAttributes"),
-				false,
-				`{"PeerURLs":null}`,
-				false,
-				store.Permanent,
-			},
-		},
-		{
-			name: "Create",
-			params: []interface{}{
-				path.Join(storeMembersPrefix, "1", "attributes"),
-				false,
-				`{"Name":"node1"}`,
-				false,
-				store.Permanent,
-			},
-		},
-	}
-	if g := st.Action(); !reflect.DeepEqual(g, wactions) {
-		t.Errorf("actions = %v, want %v", g, wactions)
-	}
-}
-
-func TestClusterStoreGet(t *testing.T) {
-	tests := []struct {
-		mems  []Member
-		wmems []Member
-	}{
-		{
-			[]Member{newTestMember(1, nil, "node1", nil)},
-			[]Member{newTestMember(1, nil, "node1", nil)},
-		},
-		{
-			[]Member{},
-			[]Member{},
-		},
-		{
-			[]Member{
-				newTestMember(1, nil, "node1", nil),
-				newTestMember(2, nil, "node2", nil),
-			},
-			[]Member{
-				newTestMember(1, nil, "node1", nil),
-				newTestMember(2, nil, "node2", nil),
-			},
-		},
-		{
-			[]Member{
-				newTestMember(2, nil, "node2", nil),
-				newTestMember(1, nil, "node1", nil),
-			},
-			[]Member{
-				newTestMember(1, nil, "node1", nil),
-				newTestMember(2, nil, "node2", nil),
-			},
-		},
-	}
-	for i, tt := range tests {
-		c := NewCluster("")
-		if err := c.AddSlice(tt.mems); err != nil {
-			t.Fatal(err)
-		}
-		c.GenID(nil)
-		cs := &clusterStore{Store: newGetAllStore(), id: c.id}
-		for _, m := range tt.mems {
-			cs.Add(m)
-		}
-		if g := cs.Get(); !reflect.DeepEqual(&g, c) {
-			t.Errorf("#%d: mems = %v, want %v", i, &g, c)
-		}
-	}
-}
-
-func TestClusterStoreRemove(t *testing.T) {
-	st := &storeRecorder{}
-	cs := &clusterStore{Store: st}
-	cs.Remove(1)
-
-	wactions := []action{
-		{name: "Delete", params: []interface{}{memberStoreKey(1), true, true}},
-		{name: "Create", params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}},
-	}
-	if !reflect.DeepEqual(st.Action(), wactions) {
-		t.Errorf("actions = %v, want %v", st.Action(), wactions)
-	}
-}
-
-func TestClusterStoreIsRemovedFalse(t *testing.T) {
-	st := &errStoreRecorder{err: etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)}
-	cs := clusterStore{Store: st}
-	if ok := cs.IsRemoved(1); ok != false {
-		t.Errorf("IsRemoved = %v, want %v", ok, false)
-	}
-}
-
-func TestClusterStoreIsRemovedTrue(t *testing.T) {
-	st := &storeRecorder{}
-	cs := &clusterStore{Store: st}
-	if ok := cs.IsRemoved(1); ok != true {
-		t.Errorf("IsRemoved = %v, want %v", ok, true)
-	}
-	wactions := []action{
-		{name: "Get", params: []interface{}{removedMemberStoreKey(1), false, false}},
-	}
-	if !reflect.DeepEqual(st.Action(), wactions) {
-		t.Errorf("actions = %v, want %v", st.Action(), wactions)
-	}
-}
-
-func TestNodeToMemberFail(t *testing.T) {
-	tests := []*store.NodeExtern{
-		{Key: "/1234", Nodes: []*store.NodeExtern{
-			{Key: "/1234/strange"},
-		}},
-		{Key: "/1234", Nodes: []*store.NodeExtern{
-			{Key: "/1234/dynamic", Value: stringp("garbage")},
-		}},
-		{Key: "/1234", Nodes: []*store.NodeExtern{
-			{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
-		}},
-		{Key: "/1234", Nodes: []*store.NodeExtern{
-			{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
-			{Key: "/1234/strange"},
-		}},
-		{Key: "/1234", Nodes: []*store.NodeExtern{
-			{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
-			{Key: "/1234/static", Value: stringp("garbage")},
-		}},
-		{Key: "/1234", Nodes: []*store.NodeExtern{
-			{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
-			{Key: "/1234/static", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
-			{Key: "/1234/strange"},
-		}},
-	}
-	for i, tt := range tests {
-		if _, err := nodeToMember(tt); err == nil {
-			t.Errorf("#%d: unexpected nil error", i)
-		}
-	}
-}
-
-func TestNodeToMember(t *testing.T) {
-	n := &store.NodeExtern{Key: "/1234", Nodes: []*store.NodeExtern{
-		{Key: "/1234/attributes", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
-		{Key: "/1234/raftAttributes", Value: stringp(`{"PeerURLs":null}`)},
-	}}
-	wm := Member{ID: 0x1234, RaftAttributes: RaftAttributes{}, Attributes: Attributes{Name: "node1"}}
-	m, err := nodeToMember(n)
-	if err != nil {
-		t.Fatalf("unexpected nodeToMember error: %v", err)
-	}
-	if !reflect.DeepEqual(m, wm) {
-		t.Errorf("member = %+v, want %+v", m, wm)
-	}
-}
-
-// simpleStore implements basic create and get.
-type simpleStore struct {
-	storeRecorder
-	st map[string]string
-}
-
-func (s *simpleStore) Create(key string, _ bool, value string, _ bool, _ time.Time) (*store.Event, error) {
-	if s.st == nil {
-		s.st = make(map[string]string)
-	}
-	s.st[key] = value
-	return nil, nil
-}
-func (s *simpleStore) Get(key string, _, _ bool) (*store.Event, error) {
-	val, ok := s.st[key]
-	if !ok {
-		return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, "", 0)
-	}
-	ev := &store.Event{Node: &store.NodeExtern{Key: key, Value: stringp(val)}}
-	return ev, nil
-}
-
-// getAllStore embeds simpleStore, and makes Get return all keys sorted.
-// It uses real store because it uses lots of logic in store and is not easy
-// to mock.
-// TODO: use mock one to do testing
-type getAllStore struct {
-	store.Store
-}
-
-func newGetAllStore() *getAllStore {
-	return &getAllStore{store.New()}
-}
-
-func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member {
-	return Member{
-		ID:             id,
-		RaftAttributes: RaftAttributes{PeerURLs: peerURLs},
-		Attributes:     Attributes{Name: name, ClientURLs: clientURLs},
-	}
-}
-
-func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member {
-	m := newTestMember(id, peerURLs, name, clientURLs)
-	return &m
-}

+ 224 - 194
etcdserver/cluster_test.go

@@ -17,254 +17,156 @@
 package etcdserver
 
 import (
+	"path"
 	"reflect"
 	"testing"
+
+	"github.com/coreos/etcd/store"
 )
 
-func TestClusterAddSlice(t *testing.T) {
+func TestClusterFromString(t *testing.T) {
 	tests := []struct {
+		f    string
 		mems []Member
-		want *Cluster
 	}{
 		{
-			[]Member{},
-			NewCluster(""),
-		},
-		{
+			"mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379",
 			[]Member{
-				newTestMember(1, []string{"foo", "bar"}, "", nil),
-				newTestMember(2, []string{"baz"}, "", nil),
-			},
-			&Cluster{
-				members: map[uint64]*Member{
-					1: newTestMemberp(1, []string{"foo", "bar"}, "", nil),
-					2: newTestMemberp(2, []string{"baz"}, "", nil),
-				},
+				newTestMember(4322322643958477905, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil),
+				newTestMember(3141198903430435750, []string{"http://10.0.0.2:2379"}, "mem2", nil),
+				newTestMember(12762790032478827328, []string{"http://127.0.0.1:2379"}, "default", nil),
 			},
 		},
 	}
 	for i, tt := range tests {
-		c := NewCluster("")
-		if err := c.AddSlice(tt.mems); err != nil {
-			t.Errorf("#%d: err=%#v, want nil", i, err)
-			continue
+		c, err := NewClusterFromString("abc", tt.f)
+		if err != nil {
+			t.Fatalf("#%d: unexpected new error: %v", i, err)
 		}
-		if !reflect.DeepEqual(c, tt.want) {
-			t.Errorf("#%d: c=%#v, want %#v", i, c, tt.want)
+		if c.name != "abc" {
+			t.Errorf("#%d: name = %v, want abc", i, c.name)
+		}
+		wc := newTestCluster(tt.mems)
+		if !reflect.DeepEqual(c.members, wc.members) {
+			t.Errorf("#%d: members = %+v, want %+v", i, c.members, wc.members)
 		}
 	}
 }
 
-func TestClusterAddSliceBad(t *testing.T) {
-	c := Cluster{
-		members: map[uint64]*Member{
-			1: newTestMemberp(1, nil, "", nil),
-		},
-	}
-	if err := c.AddSlice([]Member{newTestMember(1, nil, "", nil)}); err == nil {
-		t.Error("want err, but got nil")
-	}
-}
-
-func TestClusterPick(t *testing.T) {
-	cs := Cluster{
-		members: map[uint64]*Member{
-			1: newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil),
-			2: newTestMemberp(2, []string{"xyz"}, "", nil),
-			3: newTestMemberp(3, []string{}, "", nil),
-		},
-	}
-	ids := map[string]bool{
-		"abc": true,
-		"def": true,
-		"ghi": true,
-		"jkl": true,
-		"mno": true,
-		"pqr": true,
-		"stu": true,
+func TestClusterFromStringBad(t *testing.T) {
+	tests := []string{
+		// invalid URL
+		"%^",
+		// no URL defined for member
+		"mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
+		"mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
+		// TODO(philips): anyone know of a 64 bit sha1 hash collision
+		// "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379",
+		// the same url for two members
+		"mem1=http://128.193.4.20:2379,mem2=http://128.193.4.20:2379",
 	}
-	for i := 0; i < 1000; i++ {
-		a := cs.Pick(1)
-		if !ids[a] {
-			t.Errorf("returned ID %q not in expected range!", a)
-			break
+	for i, tt := range tests {
+		if _, err := NewClusterFromString("abc", tt); err == nil {
+			t.Errorf("#%d: unexpected successful new, want err", i)
 		}
 	}
-	if b := cs.Pick(2); b != "xyz" {
-		t.Errorf("id=%q, want %q", b, "xyz")
-	}
-	if c := cs.Pick(3); c != "" {
-		t.Errorf("id=%q, want %q", c, "")
-	}
-	if d := cs.Pick(4); d != "" {
-		t.Errorf("id=%q, want %q", d, "")
-	}
 }
 
-func TestClusterFind(t *testing.T) {
+func TestClusterFromStore(t *testing.T) {
 	tests := []struct {
-		id    uint64
-		name  string
-		mems  []Member
-		match bool
+		mems []Member
 	}{
 		{
-			1,
-			"node1",
 			[]Member{newTestMember(1, nil, "node1", nil)},
-			true,
 		},
 		{
-			2,
-			"foobar",
 			[]Member{},
-			false,
 		},
 		{
-			2,
-			"node2",
-			[]Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)},
-			true,
-		},
-		{
-			3,
-			"node3",
-			[]Member{newTestMember(1, nil, "node1", nil), newTestMember(2, nil, "node2", nil)},
-			false,
+			[]Member{
+				newTestMember(1, nil, "node1", nil),
+				newTestMember(2, nil, "node2", nil),
+			},
 		},
 	}
 	for i, tt := range tests {
-		c := NewCluster("")
-		c.AddSlice(tt.mems)
-
-		m := c.FindID(tt.id)
-		if m == nil && !tt.match {
-			continue
+		st := store.New()
+		hc := newTestCluster(nil)
+		hc.SetStore(st)
+		for _, m := range tt.mems {
+			hc.AddMember(&m)
 		}
-		if m == nil && tt.match {
-			t.Errorf("#%d: expected match got empty", i)
+		c := NewClusterFromStore("abc", st)
+		if c.name != "abc" {
+			t.Errorf("#%d: name = %v, want %v", i, c.name, "abc")
 		}
-		if m.Name != tt.name && tt.match {
-			t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.name)
-		}
-	}
-
-	for i, tt := range tests {
-		c := NewCluster("")
-		c.AddSlice(tt.mems)
-
-		m := c.FindID(tt.id)
-		if m == nil && !tt.match {
-			continue
-		}
-		if m == nil && tt.match {
-			t.Errorf("#%d: expected match got empty", i)
-		}
-		if m.ID != tt.id && tt.match {
-			t.Errorf("#%d: got = %v, want %v", i, m.Name, tt.id)
+		wc := newTestCluster(tt.mems)
+		if !reflect.DeepEqual(c.members, wc.members) {
+			t.Errorf("#%d: members = %v, want %v", i, c.members, wc.members)
 		}
 	}
 }
 
-func TestClusterSet(t *testing.T) {
+func TestClusterMember(t *testing.T) {
+	membs := []Member{
+		newTestMember(1, nil, "node1", nil),
+		newTestMember(2, nil, "node2", nil),
+	}
 	tests := []struct {
-		f    string
-		mems []Member
+		id    uint64
+		match bool
 	}{
-		{
-			"mem1=http://10.0.0.1:2379,mem1=http://128.193.4.20:2379,mem2=http://10.0.0.2:2379,default=http://127.0.0.1:2379",
-			[]Member{
-				newTestMember(3736794188555456841, []string{"http://10.0.0.1:2379", "http://128.193.4.20:2379"}, "mem1", nil),
-				newTestMember(5674507346857578431, []string{"http://10.0.0.2:2379"}, "mem2", nil),
-				newTestMember(2676999861503984872, []string{"http://127.0.0.1:2379"}, "default", nil),
-			},
-		},
+		{1, true},
+		{2, true},
+		{3, false},
 	}
 	for i, tt := range tests {
-		c := NewCluster("")
-		if err := c.AddSlice(tt.mems); err != nil {
-			t.Error(err)
+		c := newTestCluster(membs)
+		m := c.Member(tt.id)
+		if g := m != nil; g != tt.match {
+			t.Errorf("#%d: find member = %v, want %v", i, g, tt.match)
 		}
-
-		g := Cluster{}
-		g.SetMembersFromString(tt.f)
-
-		if g.String() != c.String() {
-			t.Errorf("#%d: set = %v, want %v", i, g, c)
+		if m != nil && m.ID != tt.id {
+			t.Errorf("#%d: id = %x, want %x", i, m.ID, tt.id)
 		}
 	}
 }
 
-func TestClusterGenID(t *testing.T) {
-	cs := NewCluster("")
-	cs.AddSlice([]Member{
-		newTestMember(1, nil, "", nil),
-		newTestMember(2, nil, "", nil),
-	})
-
-	cs.GenID(nil)
-	if cs.ID() == 0 {
-		t.Fatalf("cluster.ID = %v, want not 0", cs.ID())
-	}
-	previd := cs.ID()
-
-	cs.Add(newTestMember(3, nil, "", nil))
-	cs.GenID(nil)
-	if cs.ID() == previd {
-		t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
-	}
-	previd = cs.ID()
-
-	cs.GenID([]byte("http://discovery.etcd.io/12345678"))
-	if cs.ID() == previd {
-		t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
+func TestClusterMemberByName(t *testing.T) {
+	membs := []Member{
+		newTestMember(1, nil, "node1", nil),
+		newTestMember(2, nil, "node2", nil),
 	}
-}
-
-func TestClusterSetBad(t *testing.T) {
-	tests := []string{
-		// invalid URL
-		"%^",
-		// no URL defined for member
-		"mem1=,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
-		"mem1,mem2=http://128.193.4.20:2379,mem3=http://10.0.0.2:2379",
-		// TODO(philips): anyone know of a 64 bit sha1 hash collision
-		// "06b2f82fd81b2c20=http://128.193.4.20:2379,02c60cb75083ceef=http://128.193.4.20:2379",
+	tests := []struct {
+		name  string
+		match bool
+	}{
+		{"node1", true},
+		{"node2", true},
+		{"node3", false},
 	}
 	for i, tt := range tests {
-		g := NewCluster("")
-		if err := g.SetMembersFromString(tt); err == nil {
-			t.Errorf("#%d: set = %v, want err", i, tt)
+		c := newTestCluster(membs)
+		m := c.MemberByName(tt.name)
+		if g := m != nil; g != tt.match {
+			t.Errorf("#%d: find member = %v, want %v", i, g, tt.match)
+		}
+		if m != nil && m.Name != tt.name {
+			t.Errorf("#%d: name = %v, want %v", i, m.Name, tt.name)
 		}
 	}
 }
 
 func TestClusterMemberIDs(t *testing.T) {
-	cs := NewCluster("")
-	cs.AddSlice([]Member{
+	c := newTestCluster([]Member{
 		newTestMember(1, nil, "", nil),
 		newTestMember(4, nil, "", nil),
 		newTestMember(100, nil, "", nil),
 	})
 	w := []uint64{1, 4, 100}
-	g := cs.MemberIDs()
+	g := c.MemberIDs()
 	if !reflect.DeepEqual(w, g) {
-		t.Errorf("IDs=%+v, want %+v", g, w)
-	}
-}
-
-func TestClusterAddBad(t *testing.T) {
-	// Should not be possible to add the same ID multiple times
-	mems := []Member{
-		newTestMember(1, nil, "mem1", nil),
-		newTestMember(1, nil, "mem2", nil),
-	}
-	c := NewCluster("")
-	c.Add(newTestMember(1, nil, "mem1", nil))
-	for i, m := range mems {
-		if err := c.Add(m); err == nil {
-			t.Errorf("#%d: set = %v, want err", i, err)
-		}
+		t.Errorf("IDs = %+v, want %+v", g, w)
 	}
 }
 
@@ -315,11 +217,7 @@ func TestClusterPeerURLs(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		c := NewCluster("")
-		if err := c.AddSlice(tt.mems); err != nil {
-			t.Errorf("AddSlice error: %v", err)
-			continue
-		}
+		c := newTestCluster(tt.mems)
 		urls := c.PeerURLs()
 		if !reflect.DeepEqual(urls, tt.wurls) {
 			t.Errorf("#%d: PeerURLs = %v, want %v", i, urls, tt.wurls)
@@ -374,14 +272,146 @@ func TestClusterClientURLs(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		c := NewCluster("")
-		if err := c.AddSlice(tt.mems); err != nil {
-			t.Errorf("AddSlice error: %v", err)
-			continue
-		}
+		c := newTestCluster(tt.mems)
 		urls := c.ClientURLs()
 		if !reflect.DeepEqual(urls, tt.wurls) {
 			t.Errorf("#%d: ClientURLs = %v, want %v", i, urls, tt.wurls)
 		}
 	}
 }
+
+func TestClusterGenID(t *testing.T) {
+	cs := newTestCluster([]Member{
+		newTestMember(1, nil, "", nil),
+		newTestMember(2, nil, "", nil),
+	})
+
+	cs.genID()
+	if cs.ID() == 0 {
+		t.Fatalf("cluster.ID = %v, want not 0", cs.ID())
+	}
+	previd := cs.ID()
+
+	cs.SetStore(&storeRecorder{})
+	cs.AddMember(newTestMemberp(3, nil, "", nil))
+	cs.genID()
+	if cs.ID() == previd {
+		t.Fatalf("cluster.ID = %v, want not %v", cs.ID(), previd)
+	}
+}
+
+func TestNodeToMemberBad(t *testing.T) {
+	tests := []*store.NodeExtern{
+		{Key: "/1234", Nodes: []*store.NodeExtern{
+			{Key: "/1234/strange"},
+		}},
+		{Key: "/1234", Nodes: []*store.NodeExtern{
+			{Key: "/1234/dynamic", Value: stringp("garbage")},
+		}},
+		{Key: "/1234", Nodes: []*store.NodeExtern{
+			{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
+		}},
+		{Key: "/1234", Nodes: []*store.NodeExtern{
+			{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
+			{Key: "/1234/strange"},
+		}},
+		{Key: "/1234", Nodes: []*store.NodeExtern{
+			{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
+			{Key: "/1234/static", Value: stringp("garbage")},
+		}},
+		{Key: "/1234", Nodes: []*store.NodeExtern{
+			{Key: "/1234/dynamic", Value: stringp(`{"PeerURLs":null}`)},
+			{Key: "/1234/static", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
+			{Key: "/1234/strange"},
+		}},
+	}
+	for i, tt := range tests {
+		if _, err := nodeToMember(tt); err == nil {
+			t.Errorf("#%d: unexpected nil error", i)
+		}
+	}
+}
+
+func TestClusterAddMember(t *testing.T) {
+	st := &storeRecorder{}
+	c := newTestCluster(nil)
+	c.SetStore(st)
+	c.AddMember(newTestMemberp(1, nil, "node1", nil))
+
+	wactions := []action{
+		{
+			name: "Create",
+			params: []interface{}{
+				path.Join(storeMembersPrefix, "1", "raftAttributes"),
+				false,
+				`{"PeerURLs":null}`,
+				false,
+				store.Permanent,
+			},
+		},
+		{
+			name: "Create",
+			params: []interface{}{
+				path.Join(storeMembersPrefix, "1", "attributes"),
+				false,
+				`{"Name":"node1"}`,
+				false,
+				store.Permanent,
+			},
+		},
+	}
+	if g := st.Action(); !reflect.DeepEqual(g, wactions) {
+		t.Errorf("actions = %v, want %v", g, wactions)
+	}
+}
+
+func TestClusterRemoveMember(t *testing.T) {
+	st := &storeRecorder{}
+	c := newTestCluster(nil)
+	c.SetStore(st)
+	c.RemoveMember(1)
+
+	wactions := []action{
+		{name: "Delete", params: []interface{}{memberStoreKey(1), true, true}},
+		{name: "Create", params: []interface{}{removedMemberStoreKey(1), false, "", false, store.Permanent}},
+	}
+	if !reflect.DeepEqual(st.Action(), wactions) {
+		t.Errorf("actions = %v, want %v", st.Action(), wactions)
+	}
+}
+
+func TestNodeToMember(t *testing.T) {
+	n := &store.NodeExtern{Key: "/1234", Nodes: []*store.NodeExtern{
+		{Key: "/1234/attributes", Value: stringp(`{"Name":"node1","ClientURLs":null}`)},
+		{Key: "/1234/raftAttributes", Value: stringp(`{"PeerURLs":null}`)},
+	}}
+	wm := &Member{ID: 0x1234, RaftAttributes: RaftAttributes{}, Attributes: Attributes{Name: "node1"}}
+	m, err := nodeToMember(n)
+	if err != nil {
+		t.Fatalf("unexpected nodeToMember error: %v", err)
+	}
+	if !reflect.DeepEqual(m, wm) {
+		t.Errorf("member = %+v, want %+v", m, wm)
+	}
+}
+
+func newTestCluster(membs []Member) *Cluster {
+	c := &Cluster{members: make(map[uint64]*Member), removed: make(map[uint64]bool)}
+	for i, m := range membs {
+		c.members[m.ID] = &membs[i]
+	}
+	return c
+}
+
+func newTestMember(id uint64, peerURLs []string, name string, clientURLs []string) Member {
+	return Member{
+		ID:             id,
+		RaftAttributes: RaftAttributes{PeerURLs: peerURLs},
+		Attributes:     Attributes{Name: name, ClientURLs: clientURLs},
+	}
+}
+
+func newTestMemberp(id uint64, peerURLs []string, name string, clientURLs []string) *Member {
+	m := newTestMember(id, peerURLs, name, clientURLs)
+	return &m
+}

+ 1 - 1
etcdserver/config.go

@@ -40,7 +40,7 @@ type ServerConfig struct {
 // VerifyBootstrapConfig sanity-checks the initial config and returns an error
 // for things that should never happen.
 func (c *ServerConfig) VerifyBootstrapConfig() error {
-	m := c.Cluster.FindName(c.Name)
+	m := c.Cluster.MemberByName(c.Name)
 	// Make sure the cluster at least contains the local server.
 	if m == nil {
 		return fmt.Errorf("couldn't find local name %s in the initial cluster configuration", c.Name)

+ 1 - 2
etcdserver/config_test.go

@@ -44,8 +44,7 @@ func TestBootstrapConfigVerify(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		cluster := &Cluster{}
-		err := cluster.SetMembersFromString(tt.clusterSetting)
+		cluster, err := NewClusterFromString("", tt.clusterSetting)
 		if err != nil && tt.shouldError {
 			continue
 		}

+ 20 - 20
etcdserver/etcdhttp/http.go

@@ -59,12 +59,12 @@ var errClosed = errors.New("etcdhttp: client closed connection")
 // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
 func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
 	sh := &serverHandler{
-		server:       server,
-		clusterStore: server.ClusterStore,
-		stats:        server,
-		timer:        server,
-		timeout:      defaultServerTimeout,
-		clock:        clockwork.NewRealClock(),
+		server:      server,
+		clusterInfo: server.Cluster,
+		stats:       server,
+		timer:       server,
+		timeout:     defaultServerTimeout,
+		clock:       clockwork.NewRealClock(),
 	}
 	mux := http.NewServeMux()
 	mux.HandleFunc(keysPrefix, sh.serveKeys)
@@ -84,10 +84,10 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
 // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
 func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
 	sh := &serverHandler{
-		server:       server,
-		stats:        server,
-		clusterStore: server.ClusterStore,
-		clock:        clockwork.NewRealClock(),
+		server:      server,
+		stats:       server,
+		clusterInfo: server.Cluster,
+		clock:       clockwork.NewRealClock(),
 	}
 	mux := http.NewServeMux()
 	mux.HandleFunc(raftPrefix, sh.serveRaft)
@@ -97,12 +97,12 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
 
 // serverHandler provides http.Handlers for etcd client and raft communication.
 type serverHandler struct {
-	timeout      time.Duration
-	server       etcdserver.Server
-	stats        etcdserver.Stats
-	timer        etcdserver.RaftTimer
-	clusterStore etcdserver.ClusterStore
-	clock        clockwork.Clock
+	timeout     time.Duration
+	server      etcdserver.Server
+	stats       etcdserver.Stats
+	timer       etcdserver.RaftTimer
+	clusterInfo etcdserver.ClusterInfo
+	clock       clockwork.Clock
 }
 
 func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
@@ -145,7 +145,7 @@ func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
 	if !allowMethod(w, r.Method, "GET", "HEAD") {
 		return
 	}
-	endpoints := h.clusterStore.Get().ClientURLs()
+	endpoints := h.clusterInfo.ClientURLs()
 	w.Write([]byte(strings.Join(endpoints, ", ")))
 }
 
@@ -160,7 +160,7 @@ func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request)
 	case "GET":
 		idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
 		if idStr == "" {
-			msmap := h.clusterStore.Get().Members()
+			msmap := h.clusterInfo.Members()
 			ms := make(SortableMemberSlice, 0, len(msmap))
 			for _, m := range msmap {
 				ms = append(ms, m)
@@ -177,7 +177,7 @@ func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request)
 			http.Error(w, err.Error(), http.StatusBadRequest)
 			return
 		}
-		m := h.clusterStore.Get().FindID(id)
+		m := h.clusterInfo.Member(id)
 		if m == nil {
 			http.Error(w, "member not found", http.StatusNotFound)
 			return
@@ -267,7 +267,7 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	wcid := strconv.FormatUint(h.clusterStore.Get().ID(), 16)
+	wcid := strconv.FormatUint(h.clusterInfo.ID(), 16)
 	w.Header().Set("X-Etcd-Cluster-ID", wcid)
 
 	gcid := r.Header.Get("X-Etcd-Cluster-ID")

+ 21 - 32
etcdserver/etcdhttp/http_test.go

@@ -610,7 +610,7 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) {
 		{"POST", http.StatusMethodNotAllowed},
 	}
 
-	m := NewClientHandler(&etcdserver.EtcdServer{ClusterStore: &fakeCluster{}})
+	m := NewClientHandler(&etcdserver.EtcdServer{Cluster: &etcdserver.Cluster{}})
 	s := httptest.NewServer(m)
 	defer s.Close()
 
@@ -632,19 +632,14 @@ func TestV2DeprecatedMachinesEndpoint(t *testing.T) {
 
 func TestServeMachines(t *testing.T) {
 	cluster := &fakeCluster{
-		members: []etcdserver.Member{
-			{ID: 0xBEEF0, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}},
-			{ID: 0xBEEF1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}},
-			{ID: 0xBEEF2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8082"}}},
-		},
+		clientURLs: []string{"http://localhost:8080", "http://localhost:8081", "http://localhost:8082"},
 	}
-
 	writer := httptest.NewRecorder()
 	req, err := http.NewRequest("GET", "", nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	h := &serverHandler{clusterStore: cluster}
+	h := &serverHandler{clusterInfo: cluster}
 	h.serveMachines(writer, req)
 	w := "http://localhost:8080, http://localhost:8081, http://localhost:8082"
 	if g := writer.Body.String(); g != w {
@@ -981,9 +976,9 @@ func TestServeRaft(t *testing.T) {
 		}
 		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
 		h := &serverHandler{
-			timeout:      time.Hour,
-			server:       &errServer{tt.serverErr},
-			clusterStore: &fakeCluster{},
+			timeout:     time.Hour,
+			server:      &errServer{tt.serverErr},
+			clusterInfo: &fakeCluster{id: 0},
 		}
 		rw := httptest.NewRecorder()
 		h.serveRaft(rw, req)
@@ -1538,24 +1533,23 @@ func (s *serverRecorder) RemoveMember(_ context.Context, id uint64) error {
 }
 
 func TestServeAdminMembersGet(t *testing.T) {
+	memb1 := etcdserver.Member{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}}
+	memb2 := etcdserver.Member{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}}
 	cluster := &fakeCluster{
-		members: []etcdserver.Member{
-			{ID: 1, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8080"}}},
-			{ID: 2, Attributes: etcdserver.Attributes{ClientURLs: []string{"http://localhost:8081"}}},
-		},
+		members: map[uint64]*etcdserver.Member{1: &memb1, 2: &memb2},
 	}
 	h := &serverHandler{
-		server:       &serverRecorder{},
-		clock:        clockwork.NewFakeClock(),
-		clusterStore: cluster,
+		server:      &serverRecorder{},
+		clock:       clockwork.NewFakeClock(),
+		clusterInfo: cluster,
 	}
 
-	msb, err := json.Marshal(cluster.members)
+	msb, err := json.Marshal([]etcdserver.Member{memb1, memb2})
 	if err != nil {
 		t.Fatal(err)
 	}
 	wms := string(msb) + "\n"
-	mb, err := json.Marshal(cluster.members[0])
+	mb, err := json.Marshal(memb1)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -1750,17 +1744,12 @@ func TestTrimNodeExternPrefix(t *testing.T) {
 }
 
 type fakeCluster struct {
-	members []etcdserver.Member
+	id         uint64
+	clientURLs []string
+	members    map[uint64]*etcdserver.Member
 }
 
-func (c *fakeCluster) Add(m etcdserver.Member) { return }
-
-func (c *fakeCluster) Get() etcdserver.Cluster {
-	cl := etcdserver.NewCluster("")
-	cl.AddSlice(c.members)
-	return *cl
-}
-
-func (c *fakeCluster) Remove(id uint64) { return }
-
-func (c *fakeCluster) IsRemoved(id uint64) bool { return false }
+func (c *fakeCluster) ID() uint64                             { return c.id }
+func (c *fakeCluster) ClientURLs() []string                   { return c.clientURLs }
+func (c *fakeCluster) Members() map[uint64]*etcdserver.Member { return c.members }
+func (c *fakeCluster) Member(id uint64) *etcdserver.Member    { return c.members[id] }

+ 10 - 0
etcdserver/member.go

@@ -21,6 +21,7 @@ import (
 	"encoding/binary"
 	"fmt"
 	"log"
+	"math/rand"
 	"path"
 	"sort"
 	"strconv"
@@ -71,6 +72,15 @@ func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.T
 	return m
 }
 
+// PickPeerURL chooses a random address from a given Member's PeerURLs.
+// It will panic if there is no PeerURLs available in Member.
+func (m *Member) PickPeerURL() string {
+	if len(m.PeerURLs) == 0 {
+		panic("member should always have some peer url")
+	}
+	return m.PeerURLs[rand.Intn(len(m.PeerURLs))]
+}
+
 func memberStoreKey(id uint64) string {
 	return path.Join(storeMembersPrefix, idAsHex(id))
 }

+ 33 - 0
etcdserver/member_test.go

@@ -53,3 +53,36 @@ func TestMemberTime(t *testing.T) {
 		}
 	}
 }
+
+func TestMemberPick(t *testing.T) {
+	tests := []struct {
+		memb *Member
+		urls map[string]bool
+	}{
+		{
+			newTestMemberp(1, []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"}, "", nil),
+			map[string]bool{
+				"abc": true,
+				"def": true,
+				"ghi": true,
+				"jkl": true,
+				"mno": true,
+				"pqr": true,
+				"stu": true,
+			},
+		},
+		{
+			newTestMemberp(2, []string{"xyz"}, "", nil),
+			map[string]bool{"xyz": true},
+		},
+	}
+	for i, tt := range tests {
+		for j := 0; j < 1000; j++ {
+			a := tt.memb.PickPeerURL()
+			if !tt.urls[a] {
+				t.Errorf("#%d: returned ID %q not in expected range!", i, a)
+				break
+			}
+		}
+	}
+}

+ 121 - 0
etcdserver/sender.go

@@ -0,0 +1,121 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package etcdserver
+
+import (
+	"bytes"
+	"fmt"
+	"log"
+	"net/http"
+	"strconv"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+const raftPrefix = "/raft"
+
+// Sender creates the default production sender used to transport raft messages
+// in the cluster. The returned sender will update the given ServerStats and
+// LeaderStats appropriately.
+func Sender(t *http.Transport, cl *Cluster, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
+	c := &http.Client{Transport: t}
+
+	return func(msgs []raftpb.Message) {
+		for _, m := range msgs {
+			// TODO: reuse go routines
+			// limit the number of outgoing connections for the same receiver
+			go send(c, cl, m, ss, ls)
+		}
+	}
+}
+
+// send uses the given client to send a message to a member in the given
+// ClusterStore, retrying up to 3 times for each message. The given
+// ServerStats and LeaderStats are updated appropriately
+func send(c *http.Client, cl *Cluster, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
+	cid := cl.ID()
+	// TODO (xiangli): reasonable retry logic
+	for i := 0; i < 3; i++ {
+		memb := cl.Member(m.To)
+		if memb == nil {
+			// TODO: unknown peer id.. what do we do? I
+			// don't think his should ever happen, need to
+			// look into this further.
+			log.Printf("etcdhttp: no member for %d", m.To)
+			return
+		}
+		u := fmt.Sprintf("%s%s", memb.PickPeerURL(), raftPrefix)
+
+		// TODO: don't block. we should be able to have 1000s
+		// of messages out at a time.
+		data, err := m.Marshal()
+		if err != nil {
+			log.Println("etcdhttp: dropping message:", err)
+			return // drop bad message
+		}
+		if m.Type == raftpb.MsgApp {
+			ss.SendAppendReq(len(data))
+		}
+		to := idAsHex(m.To)
+		fs := ls.Follower(to)
+
+		start := time.Now()
+		sent := httpPost(c, u, cid, data)
+		end := time.Now()
+		if sent {
+			fs.Succ(end.Sub(start))
+			return
+		}
+		fs.Fail()
+		// TODO: backoff
+	}
+}
+
+// httpPost POSTs a data payload to a url using the given client. Returns true
+// if the POST succeeds, false on any failure.
+func httpPost(c *http.Client, url string, cid uint64, data []byte) bool {
+	req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
+	if err != nil {
+		// TODO: log the error?
+		return false
+	}
+	req.Header.Set("Content-Type", "application/protobuf")
+	req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(cid, 16))
+	resp, err := c.Do(req)
+	if err != nil {
+		// TODO: log the error?
+		return false
+	}
+	resp.Body.Close()
+
+	switch resp.StatusCode {
+	case http.StatusPreconditionFailed:
+		// TODO: shutdown the etcdserver gracefully?
+		log.Panicf("clusterID mismatch")
+		return false
+	case http.StatusForbidden:
+		// TODO: stop the server
+		log.Panicf("the member has been removed")
+		return false
+	case http.StatusNoContent:
+		return true
+	default:
+		return false
+	}
+}

+ 32 - 34
etcdserver/server.go

@@ -137,10 +137,9 @@ type EtcdServer struct {
 	done       chan struct{}
 	stopped    chan struct{}
 	id         uint64
-	clusterID  uint64
 	attributes Attributes
 
-	ClusterStore ClusterStore
+	Cluster *Cluster
 
 	node  raft.Node
 	store store.Store
@@ -176,12 +175,12 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 	st := store.New()
 	var w *wal.WAL
 	var n raft.Node
-	var id, cid uint64
+	var id uint64
 	if !wal.Exist(cfg.WALDir()) {
 		if err := cfg.VerifyBootstrapConfig(); err != nil {
 			log.Fatalf("etcdserver: %v", err)
 		}
-		m := cfg.Cluster.FindName(cfg.Name)
+		m := cfg.Cluster.MemberByName(cfg.Name)
 		if cfg.ShouldDiscover() {
 			d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String())
 			if err != nil {
@@ -191,11 +190,12 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 			if err != nil {
 				log.Fatalf("etcdserver: %v", err)
 			}
-			if err = cfg.Cluster.SetMembersFromString(s); err != nil {
+			if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.name, s); err != nil {
 				log.Fatalf("etcdserver: %v", err)
 			}
 		}
-		id, cid, n, w = startNode(cfg)
+		cfg.Cluster.SetStore(st)
+		id, n, w = startNode(cfg)
 	} else {
 		if cfg.ShouldDiscover() {
 			log.Printf("etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir())
@@ -210,11 +210,10 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 			st.Recovery(snapshot.Data)
 			index = snapshot.Index
 		}
-		id, cid, n, w = restartNode(cfg, index, snapshot)
+		cfg.Cluster = NewClusterFromStore(cfg.Cluster.name, st)
+		id, n, w = restartNode(cfg, index, snapshot)
 	}
 
-	cls := &clusterStore{Store: st, id: cid}
-
 	sstats := &stats.ServerStats{
 		Name: cfg.Name,
 		ID:   idAsHex(id),
@@ -225,19 +224,18 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 		store:      st,
 		node:       n,
 		id:         id,
-		clusterID:  cid,
 		attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
+		Cluster:    cfg.Cluster,
 		storage: struct {
 			*wal.WAL
 			*snap.Snapshotter
 		}{w, ss},
-		stats:        sstats,
-		lstats:       lstats,
-		send:         Sender(cfg.Transport, cls, sstats, lstats),
-		Ticker:       time.Tick(100 * time.Millisecond),
-		SyncTicker:   time.Tick(500 * time.Millisecond),
-		snapCount:    cfg.SnapCount,
-		ClusterStore: cls,
+		stats:      sstats,
+		lstats:     lstats,
+		send:       Sender(cfg.Transport, cfg.Cluster, sstats, lstats),
+		Ticker:     time.Tick(100 * time.Millisecond),
+		SyncTicker: time.Tick(500 * time.Millisecond),
+		snapCount:  cfg.SnapCount,
 	}
 	return s
 }
@@ -268,7 +266,7 @@ func (s *EtcdServer) start() {
 }
 
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
-	if s.ClusterStore.IsRemoved(m.From) {
+	if s.Cluster.IsIDRemoved(m.From) {
 		return ErrRemoved
 	}
 	return s.node.Step(ctx, m)
@@ -497,7 +495,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
 	req := pb.Request{
 		ID:     GenID(),
 		Method: "PUT",
-		Path:   memberStoreKey(s.id) + attributesSuffix,
+		Path:   path.Join(memberStoreKey(s.id), attributesSuffix),
 		Val:    string(b),
 	}
 
@@ -599,22 +597,22 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes []uint64) error
 	s.node.ApplyConfChange(cc)
 	switch cc.Type {
 	case raftpb.ConfChangeAddNode:
-		var m Member
-		if err := json.Unmarshal(cc.Context, &m); err != nil {
+		m := new(Member)
+		if err := json.Unmarshal(cc.Context, m); err != nil {
 			panic("unexpected unmarshal error")
 		}
 		if cc.NodeID != m.ID {
 			panic("unexpected nodeID mismatch")
 		}
-		s.ClusterStore.Add(m)
+		s.Cluster.AddMember(m)
 	case raftpb.ConfChangeRemoveNode:
-		s.ClusterStore.Remove(cc.NodeID)
+		s.Cluster.RemoveMember(cc.NodeID)
 	}
 	return nil
 }
 
 func (s *EtcdServer) checkConfChange(cc raftpb.ConfChange, nodes []uint64) error {
-	if s.ClusterStore.IsRemoved(cc.NodeID) {
+	if s.Cluster.IsIDRemoved(cc.NodeID) {
 		return ErrIDRemoved
 	}
 	switch cc.Type {
@@ -644,12 +642,11 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
 	s.storage.Cut()
 }
 
-func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) {
+func startNode(cfg *ServerConfig) (id uint64, n raft.Node, w *wal.WAL) {
 	var err error
 	// TODO: remove the discoveryURL when it becomes part of the source for
 	// generating nodeID.
-	member := cfg.Cluster.FindName(cfg.Name)
-	cfg.Cluster.GenID([]byte(cfg.DiscoveryURL))
+	member := cfg.Cluster.MemberByName(cfg.Name)
 	metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: member.ID, ClusterID: cfg.Cluster.ID()})
 	if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
 		log.Fatal(err)
@@ -657,19 +654,19 @@ func startNode(cfg *ServerConfig) (id, cid uint64, n raft.Node, w *wal.WAL) {
 	ids := cfg.Cluster.MemberIDs()
 	peers := make([]raft.Peer, len(ids))
 	for i, id := range ids {
-		ctx, err := json.Marshal((*cfg.Cluster).FindID(id))
+		ctx, err := json.Marshal((*cfg.Cluster).Member(id))
 		if err != nil {
 			log.Fatal(err)
 		}
 		peers[i] = raft.Peer{ID: id, Context: ctx}
 	}
-	id, cid = member.ID, cfg.Cluster.ID()
-	log.Printf("etcdserver: start node %d in cluster %d", id, cid)
-	n = raft.StartNode(member.ID, peers, 10, 1)
+	id = member.ID
+	log.Printf("etcdserver: start node %x in cluster %x", id, cfg.Cluster.ID())
+	n = raft.StartNode(id, peers, 10, 1)
 	return
 }
 
-func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id, cid uint64, n raft.Node, w *wal.WAL) {
+func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id uint64, n raft.Node, w *wal.WAL) {
 	var err error
 	// restart a node from previous wal
 	if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
@@ -682,8 +679,9 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id
 
 	var metadata pb.Metadata
 	pbutil.MustUnmarshal(&metadata, wmetadata)
-	id, cid = metadata.NodeID, metadata.ClusterID
-	log.Printf("etcdserver: restart member %x in cluster %x at commit index %d", id, cid, st.Commit)
+	id = metadata.NodeID
+	cfg.Cluster.SetID(metadata.ClusterID)
+	log.Printf("etcdserver: restart member %x in cluster %x at commit index %d", id, cfg.Cluster.ID(), st.Commit)
 	n = raft.RestartNode(id, 10, 1, snapshot, st, ents)
 	return
 }

+ 47 - 44
etcdserver/server_test.go

@@ -423,10 +423,10 @@ func TestApplyConfChangeError(t *testing.T) {
 	}
 	for i, tt := range tests {
 		n := &nodeRecorder{}
-		cs := &removedClusterStore{removed: removed}
+		cl := &Cluster{removed: removed}
 		srv := &EtcdServer{
-			node:         n,
-			ClusterStore: cs,
+			node:    n,
+			Cluster: cl,
 		}
 		err := srv.applyConfChange(tt.cc, nodes)
 		if err != tt.werr {
@@ -471,13 +471,15 @@ func testServer(t *testing.T, ns uint64) {
 		n := raft.StartNode(id, members, 10, 1)
 		tk := time.NewTicker(10 * time.Millisecond)
 		defer tk.Stop()
+		cl := newCluster("abc")
+		cl.SetStore(&storeRecorder{})
 		srv := &EtcdServer{
-			node:         n,
-			store:        store.New(),
-			send:         send,
-			storage:      &storageRecorder{},
-			Ticker:       tk.C,
-			ClusterStore: &clusterStoreRecorder{},
+			node:    n,
+			store:   store.New(),
+			send:    send,
+			storage: &storageRecorder{},
+			Ticker:  tk.C,
+			Cluster: cl,
 		}
 		srv.start()
 		ss[i] = srv
@@ -538,13 +540,15 @@ func TestDoProposal(t *testing.T) {
 		tk := make(chan time.Time)
 		// this makes <-tk always successful, which accelerates internal clock
 		close(tk)
+		cl := newCluster("abc")
+		cl.SetStore(&storeRecorder{})
 		srv := &EtcdServer{
-			node:         n,
-			store:        st,
-			send:         func(_ []raftpb.Message) {},
-			storage:      &storageRecorder{},
-			Ticker:       tk,
-			ClusterStore: &clusterStoreRecorder{},
+			node:    n,
+			store:   st,
+			send:    func(_ []raftpb.Message) {},
+			storage: &storageRecorder{},
+			Ticker:  tk,
+			Cluster: cl,
 		}
 		srv.start()
 		resp, err := srv.Do(ctx, tt)
@@ -782,12 +786,12 @@ func TestTriggerSnap(t *testing.T) {
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	s := &EtcdServer{
-		store:        st,
-		send:         func(_ []raftpb.Message) {},
-		storage:      p,
-		node:         n,
-		snapCount:    10,
-		ClusterStore: &clusterStoreRecorder{},
+		store:     st,
+		send:      func(_ []raftpb.Message) {},
+		storage:   p,
+		node:      n,
+		snapCount: 10,
+		Cluster:   &Cluster{},
 	}
 
 	s.start()
@@ -872,19 +876,20 @@ func TestAddMember(t *testing.T) {
 	n.readyc <- raft.Ready{
 		SoftState: &raft.SoftState{
 			RaftState: raft.StateLeader,
-			Nodes:     []uint64{2, 3},
+			Nodes:     []uint64{2345, 3456},
 		},
 	}
-	cs := &clusterStoreRecorder{}
+	cl := newTestCluster(nil)
+	cl.SetStore(&storeRecorder{})
 	s := &EtcdServer{
-		node:         n,
-		store:        &storeRecorder{},
-		send:         func(_ []raftpb.Message) {},
-		storage:      &storageRecorder{},
-		ClusterStore: cs,
+		node:    n,
+		store:   &storeRecorder{},
+		send:    func(_ []raftpb.Message) {},
+		storage: &storageRecorder{},
+		Cluster: cl,
 	}
 	s.start()
-	m := Member{ID: 1, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
+	m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
 	err := s.AddMember(context.TODO(), m)
 	gaction := n.Action()
 	s.Stop()
@@ -896,9 +901,8 @@ func TestAddMember(t *testing.T) {
 	if !reflect.DeepEqual(gaction, wactions) {
 		t.Errorf("action = %v, want %v", gaction, wactions)
 	}
-	wcsactions := []action{{name: "Add", params: []interface{}{m}}}
-	if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) {
-		t.Errorf("csaction = %v, want %v", g, wcsactions)
+	if cl.Member(1234) == nil {
+		t.Errorf("member with id 1234 is not added")
 	}
 }
 
@@ -908,20 +912,20 @@ func TestRemoveMember(t *testing.T) {
 	n.readyc <- raft.Ready{
 		SoftState: &raft.SoftState{
 			RaftState: raft.StateLeader,
-			Nodes:     []uint64{1, 2, 3},
+			Nodes:     []uint64{1234, 2345, 3456},
 		},
 	}
-	cs := &clusterStoreRecorder{}
+	cl := newTestCluster([]Member{{ID: 1234}})
+	cl.SetStore(&storeRecorder{})
 	s := &EtcdServer{
-		node:         n,
-		store:        &storeRecorder{},
-		send:         func(_ []raftpb.Message) {},
-		storage:      &storageRecorder{},
-		ClusterStore: cs,
+		node:    n,
+		store:   &storeRecorder{},
+		send:    func(_ []raftpb.Message) {},
+		storage: &storageRecorder{},
+		Cluster: cl,
 	}
 	s.start()
-	id := uint64(1)
-	err := s.RemoveMember(context.TODO(), id)
+	err := s.RemoveMember(context.TODO(), 1234)
 	gaction := n.Action()
 	s.Stop()
 
@@ -932,9 +936,8 @@ func TestRemoveMember(t *testing.T) {
 	if !reflect.DeepEqual(gaction, wactions) {
 		t.Errorf("action = %v, want %v", gaction, wactions)
 	}
-	wcsactions := []action{{name: "Remove", params: []interface{}{id}}}
-	if g := cs.Action(); !reflect.DeepEqual(g, wcsactions) {
-		t.Errorf("csaction = %v, want %v", g, wcsactions)
+	if cl.Member(1234) != nil {
+		t.Errorf("member with id 1234 is not removed")
 	}
 }
 

+ 7 - 9
integration/cluster_test.go

@@ -85,19 +85,14 @@ func (c *cluster) Launch(t *testing.T) {
 	}
 
 	lns := make([]net.Listener, c.Size)
-	clusterCfg := etcdserver.NewCluster(clusterName)
+	addrs := make([]string, c.Size)
 	for i := 0; i < c.Size; i++ {
 		l := newLocalListener(t)
 		// each member claims only one peer listener
 		lns[i] = l
-		listenURLs, err := types.NewURLs([]string{"http://" + l.Addr().String()})
-		if err != nil {
-			t.Fatal(err)
-		}
-		if _, err = clusterCfg.AddMemberFromURLs(c.name(i), listenURLs); err != nil {
-			t.Fatal(err)
-		}
+		addrs[i] = fmt.Sprintf("%v=%v", c.name(i), "http://"+l.Addr().String())
 	}
+	clusterStr := strings.Join(addrs, ",")
 
 	var err error
 	for i := 0; i < c.Size; i++ {
@@ -114,7 +109,10 @@ func (c *cluster) Launch(t *testing.T) {
 		if err != nil {
 			t.Fatal(err)
 		}
-		m.Cluster = clusterCfg
+		m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
+		if err != nil {
+			t.Fatal(err)
+		}
 		m.ClusterState = etcdserver.ClusterStateValueNew
 		m.Transport, err = transport.NewTransport(transport.TLSInfo{})
 		if err != nil {

+ 14 - 6
main.go

@@ -29,6 +29,7 @@ import (
 	"github.com/coreos/etcd/pkg"
 	flagtypes "github.com/coreos/etcd/pkg/flags"
 	"github.com/coreos/etcd/pkg/transport"
+	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/proxy"
 )
 
@@ -262,7 +263,6 @@ func startProxy() {
 
 // setupCluster sets up the cluster definition for bootstrap or discovery.
 func setupCluster() error {
-	cluster = etcdserver.NewCluster(*initialClusterName)
 	set := make(map[string]bool)
 	fs.Visit(func(f *flag.Flag) {
 		set[f.Name] = true
@@ -275,17 +275,25 @@ func setupCluster() error {
 		return err
 	}
 
+	err = nil
 	switch {
 	case set["discovery"]:
-		cluster = etcdserver.NewCluster(*durl)
-		_, err := cluster.AddMemberFromURLs(*name, apurls)
-		return err
+		clusterStr := genClusterString(*name, apurls)
+		cluster, err = etcdserver.NewClusterFromString(*durl, clusterStr)
 	case set["initial-cluster"]:
 		fallthrough
 	default:
 		// We're statically configured, and cluster has appropriately been set.
 		// Try to configure by indexing the static cluster by name.
-		cluster.SetMembersFromString(*initialCluster)
+		cluster, err = etcdserver.NewClusterFromString(*initialClusterName, *initialCluster)
+	}
+	return err
+}
+
+func genClusterString(name string, urls types.URLs) string {
+	addrs := make([]string, 0)
+	for _, u := range urls {
+		addrs = append(addrs, fmt.Sprintf("%v=%v", name, u.String()))
 	}
-	return nil
+	return strings.Join(addrs, ",")
 }

+ 30 - 0
main_test.go

@@ -0,0 +1,30 @@
+package main
+
+import (
+	"testing"
+
+	"github.com/coreos/etcd/pkg/types"
+)
+
+func TestGenClusterString(t *testing.T) {
+	tests := []struct {
+		name string
+		urls []string
+		wstr string
+	}{
+		{
+			"node1", []string{"http://0.0.0.0:2379", "http://1.1.1.1:2379"},
+			"node1=http://0.0.0.0:2379,node1=http://1.1.1.1:2379",
+		},
+	}
+	for i, tt := range tests {
+		urls, err := types.NewURLs(tt.urls)
+		if err != nil {
+			t.Fatalf("unexpected new urls error: %v", err)
+		}
+		str := genClusterString(tt.name, urls)
+		if str != tt.wstr {
+			t.Errorf("#%d: cluster = %s, want %s", i, str, tt.wstr)
+		}
+	}
+}