Browse Source

Add proxy mode.

Ben Johnson 12 years ago
parent
commit
1d961b8e56

+ 7 - 0
error/error.go

@@ -51,6 +51,10 @@ const (
 
 
 	EcodeWatcherCleared    = 400
 	EcodeWatcherCleared    = 400
 	EcodeEventIndexCleared = 401
 	EcodeEventIndexCleared = 401
+	EcodeProxyInternal = 402
+	EcodeInvalidActiveSize = 403
+	EcodeInvalidPromoteDelay = 404
+	EcodePromoteError = 405
 )
 )
 
 
 func init() {
 func init() {
@@ -86,6 +90,9 @@ func init() {
 	// etcd related errors
 	// etcd related errors
 	errors[EcodeWatcherCleared] = "watcher is cleared due to etcd recovery"
 	errors[EcodeWatcherCleared] = "watcher is cleared due to etcd recovery"
 	errors[EcodeEventIndexCleared] = "The event in requested index is outdated and cleared"
 	errors[EcodeEventIndexCleared] = "The event in requested index is outdated and cleared"
+	errors[EcodeProxyInternal] = "Proxy Internal Error"
+	errors[EcodeInvalidActiveSize] = "Invalid active size"
+	errors[EcodeInvalidPromoteDelay] = "Proxy promote delay"
 
 
 }
 }
 
 

+ 33 - 0
server/cluster_config.go

@@ -0,0 +1,33 @@
+package server
+
+import (
+	"time"
+)
+
+const (
+	// DefaultActiveSize is the default number of active followers allowed.
+	DefaultActiveSize = 9
+
+	// DefaultPromoteDelay is the default elapsed time before promotion.
+	DefaultPromoteDelay = int((30 * time.Minute) / time.Second)
+)
+
+// ClusterConfig represents cluster-wide configuration settings.
+// These settings can only be changed through Raft.
+type ClusterConfig struct {
+	// ActiveSize is the maximum number of node that can join as Raft followers.
+	// Nodes that join the cluster after the limit is reached are proxies.
+	ActiveSize int `json:"activeSize"`
+
+	// PromoteDelay is the amount of time, in seconds, after a node is
+	// unreachable that it will be swapped out for a proxy node, if available.
+	PromoteDelay int `json:"PromoteDelay"`
+}
+
+// NewClusterConfig returns a cluster configuration with default settings.
+func NewClusterConfig() *ClusterConfig {
+	return &ClusterConfig{
+		ActiveSize: DefaultActiveSize,
+		PromoteDelay: DefaultPromoteDelay,
+	}
+}

+ 14 - 8
server/join_command.go

@@ -1,9 +1,9 @@
 package server
 package server
 
 
 import (
 import (
+	"bytes"
 	"encoding/binary"
 	"encoding/binary"
 
 
-	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/third_party/github.com/coreos/raft"
 	"github.com/coreos/etcd/third_party/github.com/coreos/raft"
 )
 )
@@ -40,25 +40,30 @@ func (c *JoinCommand) CommandName() string {
 func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
 func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
 	ps, _ := context.Server().Context().(*PeerServer)
 	ps, _ := context.Server().Context().(*PeerServer)
 
 
+	var buf bytes.Buffer
 	b := make([]byte, 8)
 	b := make([]byte, 8)
-	binary.PutUvarint(b, context.CommitIndex())
+	n := binary.PutUvarint(b, context.CommitIndex())
+	buf.Write(b[:n])
 
 
 	// Make sure we're not getting a cached value from the registry.
 	// Make sure we're not getting a cached value from the registry.
 	ps.registry.Invalidate(c.Name)
 	ps.registry.Invalidate(c.Name)
 
 
 	// Check if the join command is from a previous peer, who lost all its previous log.
 	// Check if the join command is from a previous peer, who lost all its previous log.
 	if _, ok := ps.registry.ClientURL(c.Name); ok {
 	if _, ok := ps.registry.ClientURL(c.Name); ok {
-		return b, nil
+		binary.Write(&buf, binary.BigEndian, uint8(0)) // Mark as peer.
+		return buf.Bytes(), nil
 	}
 	}
 
 
 	// Check peer number in the cluster
 	// Check peer number in the cluster
-	if ps.registry.Count() == ps.Config.MaxClusterSize {
-		log.Debug("Reject join request from ", c.Name)
-		return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
+	if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
+		log.Debug("Join as proxy ", c.Name)
+		ps.registry.RegisterProxy(c.Name, c.RaftURL, c.EtcdURL)
+		binary.Write(&buf, binary.BigEndian, uint8(1)) // Mark as proxy.
+		return buf.Bytes(), nil
 	}
 	}
 
 
 	// Add to shared peer registry.
 	// Add to shared peer registry.
-	ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL)
+	ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL)
 
 
 	// Add peer in raft
 	// Add peer in raft
 	err := context.Server().AddPeer(c.Name, "")
 	err := context.Server().AddPeer(c.Name, "")
@@ -69,7 +74,8 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
 		ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
 		ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
 	}
 	}
 
 
-	return b, err
+	binary.Write(&buf, binary.BigEndian, uint8(0)) // Mark as peer.
+	return buf.Bytes(), err
 }
 }
 
 
 func (c *JoinCommand) NodeName() string {
 func (c *JoinCommand) NodeName() string {

+ 131 - 3
server/peer_server.go

@@ -2,12 +2,16 @@ package server
 
 
 import (
 import (
 	"bytes"
 	"bytes"
+	"bufio"
 	"encoding/binary"
 	"encoding/binary"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"io/ioutil"
+	"math/rand"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
+	"sort"
 	"strconv"
 	"strconv"
 	"time"
 	"time"
 
 
@@ -22,19 +26,20 @@ import (
 )
 )
 
 
 const ThresholdMonitorTimeout = 5 * time.Second
 const ThresholdMonitorTimeout = 5 * time.Second
+const ActiveMonitorTimeout = 5 * time.Second
 
 
 type PeerServerConfig struct {
 type PeerServerConfig struct {
 	Name           string
 	Name           string
 	Scheme         string
 	Scheme         string
 	URL            string
 	URL            string
 	SnapshotCount  int
 	SnapshotCount  int
-	MaxClusterSize int
 	RetryTimes     int
 	RetryTimes     int
 	RetryInterval  float64
 	RetryInterval  float64
 }
 }
 
 
 type PeerServer struct {
 type PeerServer struct {
 	Config		PeerServerConfig
 	Config		PeerServerConfig
+	clusterConfig	*ClusterConfig
 	raftServer	raft.Server
 	raftServer	raft.Server
 	server		*Server
 	server		*Server
 	joinIndex	uint64
 	joinIndex	uint64
@@ -43,10 +48,14 @@ type PeerServer struct {
 	registry	*Registry
 	registry	*Registry
 	store		store.Store
 	store		store.Store
 	snapConf	*snapshotConf
 	snapConf	*snapshotConf
+	mode        Mode
 
 
 	closeChan		chan bool
 	closeChan		chan bool
 	timeoutThresholdChan	chan interface{}
 	timeoutThresholdChan	chan interface{}
 
 
+	proxyPeerURL string
+	proxyClientURL string
+
 	metrics	*metrics.Bucket
 	metrics	*metrics.Bucket
 }
 }
 
 
@@ -66,6 +75,7 @@ type snapshotConf struct {
 func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
 func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
 	s := &PeerServer{
 	s := &PeerServer{
 		Config:		psConfig,
 		Config:		psConfig,
+		clusterConfig: NewClusterConfig(),
 		registry:	registry,
 		registry:	registry,
 		store:		store,
 		store:		store,
 		followersStats:	followersStats,
 		followersStats:	followersStats,
@@ -100,6 +110,50 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
 	s.raftServer = raftServer
 	s.raftServer = raftServer
 }
 }
 
 
+// Mode retrieves the current mode of the server.
+func (s *PeerServer) Mode() Mode {
+	return s.mode
+}
+
+// SetMode updates the current mode of the server.
+// Switching to a peer mode will start the Raft server.
+// Switching to a proxy mode will stop the Raft server.
+func (s *PeerServer) SetMode(mode Mode) {
+	s.mode = mode
+
+	switch mode {
+	case PeerMode:
+		if s.raftServer.Running() {
+			s.raftServer.Start()
+		}
+	case ProxyMode:
+		if !s.raftServer.Running() {
+			s.raftServer.Stop()
+		}
+	}
+}
+
+// ClusterConfig retrieves the current cluster configuration.
+func (s *PeerServer) ClusterConfig() *ClusterConfig {
+	return s.clusterConfig
+}
+
+// SetClusterConfig updates the current cluster configuration.
+// Adjusting the active size will 
+func (s *PeerServer) SetClusterConfig(c *ClusterConfig) error {
+	prevActiveSize := s.clusterConfig.ActiveSize
+	s.clusterConfig = c
+
+	// Validate configuration.
+	if c.ActiveSize < 1 {
+		return etcdErr.NewError(etcdErr.EcodeInvalidActiveSize, "Post", 0)
+	} else if c.PromoteDelay < 0 {
+		return etcdErr.NewError(etcdErr.EcodeInvalidPromoteDelay, "Post", 0)
+	}
+
+	return nil
+}
+
 // Helper function to do discovery and return results in expected format
 // Helper function to do discovery and return results in expected format
 func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
 func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
 	peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
 	peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
@@ -213,6 +267,7 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
 
 
 	go s.monitorSync()
 	go s.monitorSync()
 	go s.monitorTimeoutThreshold(s.closeChan)
 	go s.monitorTimeoutThreshold(s.closeChan)
+	go s.monitorActive(s.closeChan)
 
 
 	// open the snapshot
 	// open the snapshot
 	if snapshot {
 	if snapshot {
@@ -240,6 +295,8 @@ func (s *PeerServer) HTTPHandler() http.Handler {
 	router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
 	router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
 	router.HandleFunc("/join", s.JoinHttpHandler)
 	router.HandleFunc("/join", s.JoinHttpHandler)
 	router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
 	router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
+	router.HandleFunc("/config", s.getClusterConfigHttpHandler).Methods("GET")
+	router.HandleFunc("/config", s.setClusterConfigHttpHandler).Methods("POST")
 	router.HandleFunc("/vote", s.VoteHttpHandler)
 	router.HandleFunc("/vote", s.VoteHttpHandler)
 	router.HandleFunc("/log", s.GetLogHttpHandler)
 	router.HandleFunc("/log", s.GetLogHttpHandler)
 	router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
 	router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
@@ -385,8 +442,30 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
 			t.CancelWhenTimeout(req)
 			t.CancelWhenTimeout(req)
 
 
 			if resp.StatusCode == http.StatusOK {
 			if resp.StatusCode == http.StatusOK {
-				b, _ := ioutil.ReadAll(resp.Body)
-				s.joinIndex, _ = binary.Uvarint(b)
+				r := bufio.NewReader(resp.Body)
+				s.joinIndex, _ = binary.ReadUvarint(r)
+				
+				// Determine whether the server joined as a proxy or peer.
+				var mode uint64
+				if mode, err = binary.ReadUvarint(r); err == io.EOF {
+					mode = 0
+				} else if err != nil {
+					log.Debugf("Error reading join mode: %v", err)
+					return err
+				}
+
+				switch mode {
+				case 0:
+					s.SetMode(PeerMode)
+				case 1:
+					s.SetMode(ProxyMode)
+					s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL")
+					s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
+				default:
+					log.Debugf("Invalid join mode: %v", err)
+					return fmt.Errorf("Invalid join mode (%d): %v", mode, err)
+				}
+
 				return nil
 				return nil
 			}
 			}
 			if resp.StatusCode == http.StatusTemporaryRedirect {
 			if resp.StatusCode == http.StatusTemporaryRedirect {
@@ -532,3 +611,52 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
 		time.Sleep(ThresholdMonitorTimeout)
 		time.Sleep(ThresholdMonitorTimeout)
 	}
 	}
 }
 }
+
+// monitorActive periodically checks the status of cluster nodes and swaps them
+// out for proxies as needed.
+func (s *PeerServer) monitorActive(closeChan chan bool) {
+	for {
+		select {
+		case <- time.After(ActiveMonitorTimeout):
+		case <-closeChan:
+			return
+		}
+
+		// Ignore while this peer is not a leader.
+		if s.raftServer.State() != raft.Leader {
+			continue
+		}
+
+		// Retrieve target active size and actual active size.
+		activeSize := s.ClusterConfig().ActiveSize
+		peerCount := s.registry.PeerCount()
+		proxies := s.registry.Proxies()
+		peers := s.registry.Peers()
+		if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
+			peers = append(peers[:index], peers[index+1:]...)
+		}
+
+		// If we have more active nodes than we should then demote.
+		if peerCount > activeSize {
+			peer := peers[rand.Intn(len(peers))]
+			if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil {
+				log.Infof("%s: warning: demotion error: %v", s.Config.Name, err)
+			}
+			continue
+		}
+	}
+}
+
+
+// Mode represents whether the server is an active peer or if the server is 
+// simply acting as a proxy.
+type Mode string
+
+const (
+	// PeerMode is when the server is an active node in Raft.
+	PeerMode  = Mode("peer")
+
+	// ProxyMode is when the server is an inactive, request-forwarding node.
+	ProxyMode = Mode("proxy")
+)
+

+ 17 - 0
server/peer_server_handlers.go

@@ -188,6 +188,23 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request
 	ps.server.Dispatch(command, w, req)
 	ps.server.Dispatch(command, w, req)
 }
 }
 
 
+// Returns a JSON-encoded cluster configuration.
+func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
+	json.NewEncoder(w).Encode(&ps.clusterConfig)
+}
+
+// Updates the cluster configuration.
+func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
+	c := &SetClusterConfigCommand{Config:&ClusterConfig{}}
+	if err := json.NewDecoder(req.Body).Decode(&c.Config); err != nil {
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	log.Debugf("[recv] Update Cluster Config Request")
+	ps.server.Dispatch(c, w, req)
+}
+
 // Response to the name request
 // Response to the name request
 func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
 func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
 	log.Debugf("[recv] Get %s/name/ ", ps.Config.URL)
 	log.Debugf("[recv] Get %s/name/ ", ps.Config.URL)

+ 64 - 0
server/promote_command.go

@@ -0,0 +1,64 @@
+package server
+
+import (
+	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/third_party/github.com/coreos/raft"
+)
+
+func init() {
+	raft.RegisterCommand(&PromoteCommand{})
+}
+
+// PromoteCommand represents a Raft command for converting a proxy to a peer.
+type PromoteCommand struct {
+	Name string `json:"name"`
+}
+
+// CommandName returns the name of the command.
+func (c *PromoteCommand) CommandName() string {
+	return "etcd:promote"
+}
+
+// Apply promotes a named proxy to a peer.
+func (c *PromoteCommand) Apply(context raft.Context) (interface{}, error) {
+	ps, _ := context.Server().Context().(*PeerServer)
+	config := ps.ClusterConfig()
+
+	// If cluster size is larger than max cluster size then return an error.
+	if ps.registry.PeerCount() >= config.ActiveSize {
+		return etcdErr.NewError(etcdErr.EcodePromoteError, "", 0)
+	}
+
+	// If proxy doesn't exist then return an error.
+	if !ps.registry.ProxyExists(c.Name) {
+		return etcdErr.NewError(etcdErr.EcodePromoteError, "", 0)
+	}
+
+	// Retrieve proxy settings.
+	proxyClientURL := ps.registry.ProxyClientURL()
+	proxyPeerURL := ps.registry.ProxyPeerURL()
+
+	// Remove from registry as a proxy.
+	if err := ps.registry.UnregisterProxy(c.Name); err != nil {
+		log.Info("Cannot remove proxy: ", c.Name)
+		return nil, err
+	}
+
+	// Add to shared peer registry.
+	ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL)
+
+	// Add peer in raft
+	err := context.Server().AddPeer(c.Name, "")
+
+	// Add peer stats
+	if c.Name != ps.RaftServer().Name() {
+		ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
+		ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
+	}
+
+	return nil, err
+}
+
+func (c *JoinCommand) NodeName() string {
+	return c.Name
+}

+ 142 - 38
server/registry.go

@@ -13,13 +13,17 @@ import (
 )
 )
 
 
 // The location of the peer URL data.
 // The location of the peer URL data.
-const RegistryKey = "/_etcd/machines"
+const RegistryPeerKey = "/_etcd/machines"
+
+// The location of the proxy URL data.
+const RegistryProxyKey = "/_etcd/proxies"
 
 
 // The Registry stores URL information for nodes.
 // The Registry stores URL information for nodes.
 type Registry struct {
 type Registry struct {
 	sync.Mutex
 	sync.Mutex
 	store store.Store
 	store store.Store
-	nodes map[string]*node
+	peers map[string]*node
+	proxies map[string]*node
 }
 }
 
 
 // The internal storage format of the registry.
 // The internal storage format of the registry.
@@ -33,61 +37,126 @@ type node struct {
 func NewRegistry(s store.Store) *Registry {
 func NewRegistry(s store.Store) *Registry {
 	return &Registry{
 	return &Registry{
 		store: s,
 		store: s,
-		nodes: make(map[string]*node),
+		peers: make(map[string]*node),
+		proxies: make(map[string]*node),
+	}
+}
+
+// Peers returns a list of peer names.
+func (r *Registry) Peers() []string {
+	names := make([]string, 0, len(r.peers))
+	for name, _ := range r.peers {
+		names = append(names, name)
+	}
+	return names
+}
+
+// Proxies returns a list of proxy names.
+func (r *Registry) Proxies() []string {
+	names := make([]string, 0, len(r.proxies))
+	for name, _ := range r.proxies {
+		names = append(names, name)
 	}
 	}
+	return names
 }
 }
 
 
-// Adds a node to the registry.
-func (r *Registry) Register(name string, peerURL string, machURL string) error {
+
+// RegisterPeer adds a peer to the registry.
+func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) error {
+	// TODO(benbjohnson): Disallow peers that are already proxies.
+	return r.register(RegistryPeerKey, name, peerURL, machURL)
+}
+
+// RegisterProxy adds a proxy to the registry.
+func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) error {
+	// TODO(benbjohnson): Disallow proxies that are already peers.
+	return r.register(RegistryProxyKey, name, peerURL, machURL)
+}
+
+func (r *Registry) register(key, name string, peerURL string, machURL string) error {
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
 
 
 	// Write data to store.
 	// Write data to store.
-	key := path.Join(RegistryKey, name)
 	v := url.Values{}
 	v := url.Values{}
 	v.Set("raft", peerURL)
 	v.Set("raft", peerURL)
 	v.Set("etcd", machURL)
 	v.Set("etcd", machURL)
-	_, err := r.store.Create(key, false, v.Encode(), false, store.Permanent)
+	_, err := r.store.Create(path.Join(key, name), false, v.Encode(), false, store.Permanent)
 	log.Debugf("Register: %s", name)
 	log.Debugf("Register: %s", name)
 	return err
 	return err
 }
 }
 
 
-// Removes a node from the registry.
-func (r *Registry) Unregister(name string) error {
+// UnregisterPeer removes a peer from the registry.
+func (r *Registry) UnregisterPeer(name string) error {
+	return r.unregister(RegistryPeerKey, name)
+}
+
+// UnregisterProxy removes a proxy from the registry.
+func (r *Registry) UnregisterProxy(name string) error {
+	return r.unregister(RegistryProxyKey, name)
+}
+
+func (r *Registry) unregister(key, name string) error {
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
 
 
-	// Remove from cache.
-	// delete(r.nodes, name)
-
 	// Remove the key from the store.
 	// Remove the key from the store.
-	_, err := r.store.Delete(path.Join(RegistryKey, name), false, false)
+	_, err := r.store.Delete(path.Join(key, name), false, false)
 	log.Debugf("Unregister: %s", name)
 	log.Debugf("Unregister: %s", name)
 	return err
 	return err
 }
 }
 
 
+// PeerCount returns the number of peers in the cluster.
+func (r *Registry) PeerCount() int {
+	return r.count(RegistryPeerKey)
+}
+
+// ProxyCount returns the number of proxies in the cluster.
+func (r *Registry) ProxyCount() int {
+	return r.count(RegistryProxyKey)
+}
+
 // Returns the number of nodes in the cluster.
 // Returns the number of nodes in the cluster.
-func (r *Registry) Count() int {
-	e, err := r.store.Get(RegistryKey, false, false)
+func (r *Registry) count(key string) int {
+	e, err := r.store.Get(key, false, false)
 	if err != nil {
 	if err != nil {
 		return 0
 		return 0
 	}
 	}
 	return len(e.Node.Nodes)
 	return len(e.Node.Nodes)
 }
 }
 
 
+// PeerExists checks if a peer with the given name exists.
+func (r *Registry) PeerExists(name string) bool {
+	return r.exists(RegistryPeerKey, name)
+}
+
+// ProxyExists checks if a proxy with the given name exists.
+func (r *Registry) ProxyExists(name string) bool {
+	return r.exists(RegistryProxyKey, name)
+}
+
+func (r *Registry) exists(key, name string) bool {
+	e, err := r.store.Get(path.Join(key, name), false, false)
+	if err != nil {
+		return false
+	}
+	return (e.Node != nil)
+}
+
+
 // Retrieves the client URL for a given node by name.
 // Retrieves the client URL for a given node by name.
 func (r *Registry) ClientURL(name string) (string, bool) {
 func (r *Registry) ClientURL(name string) (string, bool) {
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
-	return r.clientURL(name)
+	return r.clientURL(RegistryPeerKey, name)
 }
 }
 
 
-func (r *Registry) clientURL(name string) (string, bool) {
-	if r.nodes[name] == nil {
-		r.load(name)
+func (r *Registry) clientURL(key, name string) (string, bool) {
+	if r.peers[name] == nil {
+		r.peers[name] = r.load(key, name)
 	}
 	}
 
 
-	if node := r.nodes[name]; node != nil {
+	if node := r.peers[name]; node != nil {
 		return node.url, true
 		return node.url, true
 	}
 	}
 
 
@@ -110,73 +179,108 @@ func (r *Registry) PeerHost(name string) (string, bool) {
 func (r *Registry) PeerURL(name string) (string, bool) {
 func (r *Registry) PeerURL(name string) (string, bool) {
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
-	return r.peerURL(name)
+	return r.peerURL(RegistryPeerKey,name)
 }
 }
 
 
-func (r *Registry) peerURL(name string) (string, bool) {
-	if r.nodes[name] == nil {
-		r.load(name)
+func (r *Registry) peerURL(key, name string) (string, bool) {
+	if r.peers[name] == nil {
+		r.peers[name] = r.load(key, name)
 	}
 	}
 
 
-	if node := r.nodes[name]; node != nil {
+	if node := r.peers[name]; node != nil {
 		return node.peerURL, true
 		return node.peerURL, true
 	}
 	}
 
 
 	return "", false
 	return "", false
 }
 }
 
 
+// Retrieves the client URL for a given proxy by name.
+func (r *Registry) ProxyClientURL(name string) (string, bool) {
+	r.Lock()
+	defer r.Unlock()
+	return r.proxyClientURL(RegistryProxyKey, name)
+}
+
+func (r *Registry) proxyClientURL(key, name string) (string, bool) {
+	if r.proxies[name] == nil {
+		r.proxies[name] = r.load(key, name)
+	}
+	if node := r.proxies[name]; node != nil {
+		return node.url, true
+	}
+	return "", false
+}
+
+// Retrieves the peer URL for a given proxy by name.
+func (r *Registry) ProxyPeerURL(name string) (string, bool) {
+	r.Lock()
+	defer r.Unlock()
+	return r.proxyPeerURL(RegistryProxyKey,name)
+}
+
+func (r *Registry) proxyPeerURL(key, name string) (string, bool) {
+	if r.proxies[name] == nil {
+		r.proxies[name] = r.load(key, name)
+	}
+	if node := r.proxies[name]; node != nil {
+		return node.peerURL, true
+	}
+	return "", false
+}
+
 // Retrieves the Client URLs for all nodes.
 // Retrieves the Client URLs for all nodes.
 func (r *Registry) ClientURLs(leaderName, selfName string) []string {
 func (r *Registry) ClientURLs(leaderName, selfName string) []string {
-	return r.urls(leaderName, selfName, r.clientURL)
+	return r.urls(RegistryPeerKey, leaderName, selfName, r.clientURL)
 }
 }
 
 
 // Retrieves the Peer URLs for all nodes.
 // Retrieves the Peer URLs for all nodes.
 func (r *Registry) PeerURLs(leaderName, selfName string) []string {
 func (r *Registry) PeerURLs(leaderName, selfName string) []string {
-	return r.urls(leaderName, selfName, r.peerURL)
+	return r.urls(RegistryPeerKey, leaderName, selfName, r.peerURL)
 }
 }
 
 
 // Retrieves the URLs for all nodes using url function.
 // Retrieves the URLs for all nodes using url function.
-func (r *Registry) urls(leaderName, selfName string, url func(name string) (string, bool)) []string {
+func (r *Registry) urls(key, leaderName, selfName string, url func(key, name string) (string, bool)) []string {
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
 
 
 	// Build list including the leader and self.
 	// Build list including the leader and self.
 	urls := make([]string, 0)
 	urls := make([]string, 0)
-	if url, _ := url(leaderName); len(url) > 0 {
+	if url, _ := url(key, leaderName); len(url) > 0 {
 		urls = append(urls, url)
 		urls = append(urls, url)
 	}
 	}
 
 
 	// Retrieve a list of all nodes.
 	// Retrieve a list of all nodes.
-	if e, _ := r.store.Get(RegistryKey, false, false); e != nil {
+	if e, _ := r.store.Get(key, false, false); e != nil {
 		// Lookup the URL for each one.
 		// Lookup the URL for each one.
 		for _, pair := range e.Node.Nodes {
 		for _, pair := range e.Node.Nodes {
 			_, name := filepath.Split(pair.Key)
 			_, name := filepath.Split(pair.Key)
-			if url, _ := url(name); len(url) > 0 && name != leaderName {
+			if url, _ := url(key, name); len(url) > 0 && name != leaderName {
 				urls = append(urls, url)
 				urls = append(urls, url)
 			}
 			}
 		}
 		}
 	}
 	}
 
 
-	log.Infof("URLs: %s / %s (%s)", leaderName, selfName, strings.Join(urls, ","))
+	log.Infof("URLs: %s / %s (%s)", key, leaderName, selfName, strings.Join(urls, ","))
 
 
 	return urls
 	return urls
 }
 }
 
 
 // Removes a node from the cache.
 // Removes a node from the cache.
 func (r *Registry) Invalidate(name string) {
 func (r *Registry) Invalidate(name string) {
-	delete(r.nodes, name)
+	delete(r.peers, name)
+	delete(r.proxies, name)
 }
 }
 
 
 // Loads the given node by name from the store into the cache.
 // Loads the given node by name from the store into the cache.
-func (r *Registry) load(name string) {
+func (r *Registry) load(key, name string) *node {
 	if name == "" {
 	if name == "" {
-		return
+		return nil
 	}
 	}
 
 
 	// Retrieve from store.
 	// Retrieve from store.
-	e, err := r.store.Get(path.Join(RegistryKey, name), false, false)
+	e, err := r.store.Get(path.Join(RegistryPeerKey, name), false, false)
 	if err != nil {
 	if err != nil {
-		return
+		return nil
 	}
 	}
 
 
 	// Parse as a query string.
 	// Parse as a query string.
@@ -186,7 +290,7 @@ func (r *Registry) load(name string) {
 	}
 	}
 
 
 	// Create node.
 	// Create node.
-	r.nodes[name] = &node{
+	return &node{
 		url:     m["etcd"][0],
 		url:     m["etcd"][0],
 		peerURL: m["raft"][0],
 		peerURL: m["raft"][0],
 	}
 	}

+ 6 - 1
server/remove_command.go

@@ -26,8 +26,13 @@ func (c *RemoveCommand) CommandName() string {
 func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) {
 func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) {
 	ps, _ := context.Server().Context().(*PeerServer)
 	ps, _ := context.Server().Context().(*PeerServer)
 
 
+	// If this is a proxy then remove it and exit.
+	if ps.registry.ProxyExists(c.Name) {
+		return []byte{0}, ps.registry.UnregisterProxy(c.Name)
+	}
+
 	// Remove node from the shared registry.
 	// Remove node from the shared registry.
-	err := ps.registry.Unregister(c.Name)
+	err := ps.registry.UnregisterPeer(c.Name)
 
 
 	// Delete from stats
 	// Delete from stats
 	delete(ps.followersStats.Followers, c.Name)
 	delete(ps.followersStats.Followers, c.Name)

+ 29 - 16
server/server.go

@@ -164,6 +164,17 @@ func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWrit
 		// Log request.
 		// Log request.
 		log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr)
 		log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr)
 
 
+		// Forward request along if the server is a proxy.
+		if s.peerServer.Mode() == ProxyMode {
+			if s.peerServer.proxyClientURL == "" {
+				w.Header().Set("Content-Type", "application/json")
+				etcdErr.NewError(402, "", 0).Write(w)
+				return
+			}
+			uhttp.Redirect(s.peerServer.proxyClientURL, w, req)
+			return
+		}
+
 		// Execute handler function and return error if necessary.
 		// Execute handler function and return error if necessary.
 		if err := f(w, req); err != nil {
 		if err := f(w, req); err != nil {
 			if etcdErr, ok := err.(*etcdErr.Error); ok {
 			if etcdErr, ok := err.(*etcdErr.Error); ok {
@@ -206,6 +217,9 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
 			return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
 			return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
 		}
 		}
 
 
+		w.Header().Set("X-Leader-Client-URL", s.url)
+		w.Header().Set("X-Leader-Peer-URL", ps.Config.URL)
+
 		// response for raft related commands[join/remove]
 		// response for raft related commands[join/remove]
 		if b, ok := result.([]byte); ok {
 		if b, ok := result.([]byte); ok {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
@@ -239,25 +253,24 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
 
 
 		return nil
 		return nil
 
 
-	} else {
-		leader := ps.raftServer.Leader()
-
-		// No leader available.
-		if leader == "" {
-			return etcdErr.NewError(300, "", s.Store().Index())
-		}
+	}
 
 
-		var url string
-		switch c.(type) {
-		case *JoinCommand, *RemoveCommand:
-			url, _ = ps.registry.PeerURL(leader)
-		default:
-			url, _ = ps.registry.ClientURL(leader)
-		}
-		uhttp.Redirect(url, w, req)
+	leader := ps.raftServer.Leader()
+	if leader == "" {
+		return etcdErr.NewError(300, "", s.Store().Index())
+	}
 
 
-		return nil
+	var url string
+	switch c.(type) {
+	case *JoinCommand, *RemoveCommand:
+		url, _ = ps.registry.PeerURL(leader)
+	default:
+		url, _ = ps.registry.ClientURL(leader)
 	}
 	}
+
+	uhttp.Redirect(url, w, req)
+
+	return nil
 }
 }
 
 
 // Handler to return the current version of etcd.
 // Handler to return the current version of etcd.

+ 25 - 0
server/set_cluster_config_command.go

@@ -0,0 +1,25 @@
+package server
+
+import (
+	"github.com/coreos/etcd/third_party/github.com/coreos/raft"
+)
+
+func init() {
+	raft.RegisterCommand(&SetClusterConfigCommand{})
+}
+
+// SetClusterConfigCommand sets the cluster-level configuration.
+type SetClusterConfigCommand struct {
+	Config *ClusterConfig `json:"config"`
+}
+
+// CommandName returns the name of the command.
+func (c *SetClusterConfigCommand) CommandName() string {
+	return "etcd:setClusterConfig"
+}
+
+// Apply updates the cluster configuration.
+func (c *SetClusterConfigCommand) Apply(context raft.Context) (interface{}, error) {
+	ps, _ := context.Server().Context().(*PeerServer)
+	return nil, ps.SetClusterConfig(c.Config)
+}

+ 6 - 0
server/v2/get_handler.go

@@ -122,5 +122,11 @@ func writeHeaders(w http.ResponseWriter, s Server) {
 	w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
 	w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
 	w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
 	w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
 	w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
 	w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
+	if url, ok := s.ClientURL(s.Leader()); ok {
+		w.Header().Set("X-Leader-Client-URL", url)
+	}
+	if url, ok := s.PeerURL(s.Leader()); ok {
+		w.Header().Set("X-Leader-Peer-URL", url)
+	}
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 }
 }

+ 45 - 0
tests/functional/proxy_test.go

@@ -0,0 +1,45 @@
+package test
+
+import (
+	"fmt"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/tests"
+	"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
+	"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
+)
+
+// Create a full cluster and then add extra an extra proxy node.
+func TestProxy(t *testing.T) {
+	clusterSize := 10 // MaxClusterSize + 1
+	_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
+	assert.NoError(t, err)
+	defer DestroyCluster(etcds)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	c := etcd.NewClient(nil)
+	c.SyncCluster()
+
+	// Set key.
+	time.Sleep(time.Second)
+	if _, err := c.Set("foo", "bar", 0); err != nil {
+		panic(err)
+	}
+	time.Sleep(time.Second)
+
+	// Check that all peers and proxies have the value.
+	for i, _ := range etcds {
+		resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000 + (i+1)))
+		if assert.NoError(t, err) {
+			body := tests.ReadBodyJSON(resp)
+			if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
+				assert.Equal(t, node["value"], "bar")
+			}
+		}
+	}
+}

+ 1 - 1
tests/functional/util.go

@@ -109,7 +109,7 @@ func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
 			}
 			}
 		} else {
 		} else {
 			strI := strconv.Itoa(i + 1)
 			strI := strconv.Itoa(i + 1)
-			argGroup[i] = []string{"etcd", "-name=node" + strI, "-addr=127.0.0.1:400" + strI, "-peer-addr=127.0.0.1:700" + strI, "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001"}
+			argGroup[i] = []string{"etcd", "-name=node" + strI, fmt.Sprintf("-addr=127.0.0.1:%d", 4001 + i), fmt.Sprintf("-peer-addr=127.0.0.1:%d", 7001 + i), "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001"}
 			if ssl {
 			if ssl {
 				argGroup[i] = append(argGroup[i], sslServer2...)
 				argGroup[i] = append(argGroup[i], sslServer2...)
 			}
 			}