Browse Source

Refactor commands.

Ben Johnson 12 years ago
parent
commit
89334df5ae

+ 0 - 255
command.go

@@ -1,255 +0,0 @@
-package main
-
-import (
-	"encoding/binary"
-	"fmt"
-	"os"
-	"path"
-	"time"
-
-	etcdErr "github.com/coreos/etcd/error"
-	"github.com/coreos/etcd/store"
-	"github.com/coreos/go-raft"
-)
-
-const commandPrefix = "etcd:"
-
-func commandName(name string) string {
-	return commandPrefix + name
-}
-
-// A command represents an action to be taken on the replicated state machine.
-type Command interface {
-	CommandName() string
-	Apply(server *raft.Server) (interface{}, error)
-}
-
-// Create command
-type CreateCommand struct {
-	Key               string    `json:"key"`
-	Value             string    `json:"value"`
-	ExpireTime        time.Time `json:"expireTime"`
-	IncrementalSuffix bool      `json:"incrementalSuffix"`
-	Force             bool      `json:"force"`
-}
-
-// The name of the create command in the log
-func (c *CreateCommand) CommandName() string {
-	return commandName("create")
-}
-
-// Create node
-func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) {
-	s, _ := server.StateMachine().(*store.Store)
-
-	e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term())
-
-	if err != nil {
-		debug(err)
-		return nil, err
-	}
-
-	return e, nil
-}
-
-// Update command
-type UpdateCommand struct {
-	Key        string    `json:"key"`
-	Value      string    `json:"value"`
-	ExpireTime time.Time `json:"expireTime"`
-}
-
-// The name of the update command in the log
-func (c *UpdateCommand) CommandName() string {
-	return commandName("update")
-}
-
-// Update node
-func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) {
-	s, _ := server.StateMachine().(*store.Store)
-
-	e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
-
-	if err != nil {
-		debug(err)
-		return nil, err
-	}
-
-	return e, nil
-}
-
-// TestAndSet command
-type TestAndSetCommand struct {
-	Key        string    `json:"key"`
-	Value      string    `json:"value"`
-	ExpireTime time.Time `json:"expireTime"`
-	PrevValue  string    `json: prevValue`
-	PrevIndex  uint64    `json: prevValue`
-}
-
-// The name of the testAndSet command in the log
-func (c *TestAndSetCommand) CommandName() string {
-	return commandName("testAndSet")
-}
-
-// Set the key-value pair if the current value of the key equals to the given prevValue
-func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
-	s, _ := server.StateMachine().(*store.Store)
-
-	e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,
-		c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
-
-	if err != nil {
-		debug(err)
-		return nil, err
-	}
-
-	return e, nil
-}
-
-// Delete command
-type DeleteCommand struct {
-	Key       string `json:"key"`
-	Recursive bool   `json:"recursive"`
-}
-
-// The name of the delete command in the log
-func (c *DeleteCommand) CommandName() string {
-	return commandName("delete")
-}
-
-// Delete the key
-func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
-	s, _ := server.StateMachine().(*store.Store)
-
-	e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())
-
-	if err != nil {
-		debug(err)
-		return nil, err
-	}
-
-	return e, nil
-}
-
-// JoinCommand
-type JoinCommand struct {
-	RaftVersion string `json:"raftVersion"`
-	Name        string `json:"name"`
-	RaftURL     string `json:"raftURL"`
-	EtcdURL     string `json:"etcdURL"`
-}
-
-func newJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand {
-	return &JoinCommand{
-		RaftVersion: version,
-		Name:        name,
-		RaftURL:     raftUrl,
-		EtcdURL:     etcdUrl,
-	}
-}
-
-// The name of the join command in the log
-func (c *JoinCommand) CommandName() string {
-	return commandName("join")
-}
-
-// Join a server to the cluster
-func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
-	s, _ := server.StateMachine().(*store.Store)
-	r, _ := server.Context().(*raftServer)
-
-	// check if the join command is from a previous machine, who lost all its previous log.
-	e, _ := s.Get(path.Join("/_etcd/machines", c.Name), false, false, server.CommitIndex(), server.Term())
-
-	b := make([]byte, 8)
-	binary.PutUvarint(b, server.CommitIndex())
-
-	if e != nil {
-		return b, nil
-	}
-
-	// check machine number in the cluster
-	num := machineNum()
-	if num == maxClusterSize {
-		debug("Reject join request from ", c.Name)
-		return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term())
-	}
-
-	addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)
-
-	// add peer in raft
-	err := server.AddPeer(c.Name, "")
-
-	// add machine in etcd storage
-	key := path.Join("_etcd/machines", c.Name)
-	value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
-	s.Create(key, value, false, false, store.Permanent, server.CommitIndex(), server.Term())
-
-	// add peer stats
-	if c.Name != r.Name() {
-		r.followersStats.Followers[c.Name] = &raftFollowerStats{}
-		r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
-	}
-
-	return b, err
-}
-
-func (c *JoinCommand) NodeName() string {
-	return c.Name
-}
-
-// RemoveCommand
-type RemoveCommand struct {
-	Name string `json:"name"`
-}
-
-// The name of the remove command in the log
-func (c *RemoveCommand) CommandName() string {
-	return commandName("remove")
-}
-
-// Remove a server from the cluster
-func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
-	s, _ := server.StateMachine().(*store.Store)
-	r, _ := server.Context().(*raftServer)
-
-	// remove machine in etcd storage
-	key := path.Join("_etcd/machines", c.Name)
-
-	_, err := s.Delete(key, false, server.CommitIndex(), server.Term())
-	// delete from stats
-	delete(r.followersStats.Followers, c.Name)
-
-	if err != nil {
-		return []byte{0}, err
-	}
-
-	// remove peer in raft
-	err = server.RemovePeer(c.Name)
-
-	if err != nil {
-		return []byte{0}, err
-	}
-
-	if c.Name == server.Name() {
-		// the removed node is this node
-
-		// if the node is not replaying the previous logs
-		// and the node has sent out a join request in this
-		// start. It is sure that this node received a new remove
-		// command and need to be removed
-		if server.CommitIndex() > r.joinIndex && r.joinIndex != 0 {
-			debugf("server [%s] is removed", server.Name())
-			os.Exit(0)
-		} else {
-			// else ignore remove
-			debugf("ignore previous remove command.")
-		}
-	}
-
-	b := make([]byte, 8)
-	binary.PutUvarint(b, server.CommitIndex())
-
-	return b, err
-}

+ 19 - 0
command/command.go

@@ -0,0 +1,19 @@
+package command
+
+import (
+	"github.com/coreos/go-raft"
+)
+
+// A command represents an action to be taken on the replicated state machine.
+type Command interface {
+	CommandName() string
+	Apply(server *raft.Server) (interface{}, error)
+}
+
+// Registers commands to the Raft library.
+func Register() {
+	raft.RegisterCommand(&DeleteCommand{})
+	raft.RegisterCommand(&TestAndSetCommand{})
+	raft.RegisterCommand(&CreateCommand{})
+	raft.RegisterCommand(&UpdateCommand{})
+}

+ 36 - 0
command/create_command.go

@@ -0,0 +1,36 @@
+package command
+
+import (
+	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
+	"time"
+)
+
+// Create command
+type CreateCommand struct {
+	Key               string    `json:"key"`
+	Value             string    `json:"value"`
+	ExpireTime        time.Time `json:"expireTime"`
+	IncrementalSuffix bool      `json:"incrementalSuffix"`
+	Force             bool      `json:"force"`
+}
+
+// The name of the create command in the log
+func (c *CreateCommand) CommandName() string {
+	return "etcd:create"
+}
+
+// Create node
+func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) {
+	s, _ := server.StateMachine().(*store.Store)
+
+	e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		log.Debug(err)
+		return nil, err
+	}
+
+	return e, nil
+}

+ 32 - 0
command/delete_command.go

@@ -0,0 +1,32 @@
+package command
+
+import (
+	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
+)
+
+// The DeleteCommand removes a key from the Store.
+type DeleteCommand struct {
+	Key       string `json:"key"`
+	Recursive bool   `json:"recursive"`
+}
+
+// The name of the delete command in the log
+func (c *DeleteCommand) CommandName() string {
+	return "etcd:delete"
+}
+
+// Delete the key
+func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
+	s, _ := server.StateMachine().(*store.Store)
+
+	e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		log.Debug(err)
+		return nil, err
+	}
+
+	return e, nil
+}

+ 38 - 0
command/test_and_set_command.go

@@ -0,0 +1,38 @@
+package command
+
+import (
+	"time"
+
+	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
+)
+
+// The TestAndSetCommand performs a conditional update on a key in the store.
+type TestAndSetCommand struct {
+	Key        string    `json:"key"`
+	Value      string    `json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+	PrevValue  string    `json: prevValue`
+	PrevIndex  uint64    `json: prevIndex`
+}
+
+// The name of the testAndSet command in the log
+func (c *TestAndSetCommand) CommandName() string {
+	return "etcd:testAndSet"
+}
+
+// Set the key-value pair if the current value of the key equals to the given prevValue
+func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
+	s, _ := server.StateMachine().(*store.Store)
+
+	e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,
+		c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		log.Debug(err)
+		return nil, err
+	}
+
+	return e, nil
+}

+ 35 - 0
command/update_command.go

@@ -0,0 +1,35 @@
+package command
+
+import (
+	"time"
+
+	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
+)
+
+// The UpdateCommand updates the value of a key in the Store.
+type UpdateCommand struct {
+	Key        string    `json:"key"`
+	Value      string    `json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+}
+
+// The name of the update command in the log
+func (c *UpdateCommand) CommandName() string {
+	return "etcd:update"
+}
+
+// Update node
+func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) {
+	s, _ := server.StateMachine().(*store.Store)
+
+	e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		log.Debug(err)
+		return nil, err
+	}
+
+	return e, nil
+}

+ 5 - 17
etcd.go

@@ -8,8 +8,9 @@ import (
 	"strings"
 	"strings"
 	"time"
 	"time"
 
 
-	"github.com/coreos/etcd/store"
+	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/server"
 	"github.com/coreos/etcd/server"
+	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 	"github.com/coreos/go-raft"
 )
 )
 
 
@@ -20,7 +21,6 @@ import (
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
 var (
 var (
-	verbose     bool
 	veryVerbose bool
 	veryVerbose bool
 
 
 	machines     string
 	machines     string
@@ -43,11 +43,11 @@ var (
 
 
 	cpuprofile string
 	cpuprofile string
 
 
-	cors     string
+	cors string
 )
 )
 
 
 func init() {
 func init() {
-	flag.BoolVar(&verbose, "v", false, "verbose logging")
+	flag.BoolVar(&log.Verbose, "v", false, "verbose logging")
 	flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
 	flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
 
 
 	flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
 	flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
@@ -97,12 +97,6 @@ const (
 //
 //
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
-type TLSInfo struct {
-	CertFile string `json:"CertFile"`
-	KeyFile  string `json:"KeyFile"`
-	CAFile   string `json:"CAFile"`
-}
-
 type Info struct {
 type Info struct {
 	Name string `json:"name"`
 	Name string `json:"name"`
 
 
@@ -117,12 +111,6 @@ type Info struct {
 	EtcdTLS TLSInfo `json:"etcdTLS"`
 	EtcdTLS TLSInfo `json:"etcdTLS"`
 }
 }
 
 
-type TLSConfig struct {
-	Scheme string
-	Server tls.Config
-	Client tls.Config
-}
-
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 //
 //
 // Variables
 // Variables
@@ -199,6 +187,7 @@ func main() {
 
 
 	// Create etcd and raft server
 	// Create etcd and raft server
 	r := newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS)
 	r := newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS)
+	r.MaxClusterSize = maxClusterSize
 	snapConf = r.newSnapshotConf()
 	snapConf = r.newSnapshotConf()
 
 
 	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r)
 	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r)
@@ -209,4 +198,3 @@ func main() {
 	r.ListenAndServe()
 	r.ListenAndServe()
 	s.ListenAndServe()
 	s.ListenAndServe()
 }
 }
-

+ 44 - 0
log/log.go

@@ -0,0 +1,44 @@
+package log
+
+import (
+	golog "github.com/coreos/go-log/log"
+	"os"
+)
+
+// The Verbose flag turns on verbose logging.
+var Verbose bool = false
+
+var logger *golog.Logger = golog.New("etcd", false,
+	golog.CombinedSink(os.Stdout, "[%s] %s %-9s | %s\n", []string{"prefix", "time", "priority", "message"}))
+
+func Infof(format string, v ...interface{}) {
+	logger.Infof(format, v...)
+}
+
+func Debugf(format string, v ...interface{}) {
+	if Verbose {
+		logger.Debugf(format, v...)
+	}
+}
+
+func Debug(v ...interface{}) {
+	if Verbose {
+		logger.Debug(v...)
+	}
+}
+
+func Warnf(format string, v ...interface{}) {
+	logger.Warningf(format, v...)
+}
+
+func Warn(v ...interface{}) {
+	logger.Warning(v...)
+}
+
+func Fatalf(format string, v ...interface{}) {
+	logger.Fatalf(format, v...)
+}
+
+func Fatal(v ...interface{}) {
+	logger.Fatalln(v...)
+}

+ 0 - 11
machines.go

@@ -1,16 +1,5 @@
 package main
 package main
 
 
-// machineNum returns the number of machines in the cluster
-func machineNum() int {
-	e, err := etcdStore.Get("/_etcd/machines", false, false, 0, 0)
-
-	if err != nil {
-		return 0
-	}
-
-	return len(e.KVPairs)
-}
-
 // getMachines gets the current machines in the cluster
 // getMachines gets the current machines in the cluster
 func (r *raftServer) getMachines(toURL func(string) (string, bool)) []string {
 func (r *raftServer) getMachines(toURL func(string) (string, bool)) []string {
 	peers := r.Peers()
 	peers := r.Peers()

+ 6 - 18
raft_server.go

@@ -11,10 +11,15 @@ import (
 	"net/url"
 	"net/url"
 	"time"
 	"time"
 
 
+	"github.com/coreos/etcd/command"
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/go-raft"
 	"github.com/coreos/go-raft"
 )
 )
 
 
+func init() {
+	command.Register()
+}
+
 type raftServer struct {
 type raftServer struct {
 	*raft.Server
 	*raft.Server
 	version        string
 	version        string
@@ -26,10 +31,9 @@ type raftServer struct {
 	tlsInfo        *TLSInfo
 	tlsInfo        *TLSInfo
 	followersStats *raftFollowersStats
 	followersStats *raftFollowersStats
 	serverStats    *raftServerStats
 	serverStats    *raftServerStats
+	MaxClusterSize int
 }
 }
 
 
-//var r *raftServer
-
 func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
 func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
 
 
 	raftWrapper := &raftServer{
 	raftWrapper := &raftServer{
@@ -68,9 +72,6 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
 
 
 // Start the raft server
 // Start the raft server
 func (r *raftServer) ListenAndServe() {
 func (r *raftServer) ListenAndServe() {
-	// Setup commands.
-	registerCommands()
-
 	// LoadSnapshot
 	// LoadSnapshot
 	if snapshot {
 	if snapshot {
 		err := r.LoadSnapshot()
 		err := r.LoadSnapshot()
@@ -314,16 +315,3 @@ func (r *raftServer) PeerStats() []byte {
 	}
 	}
 	return nil
 	return nil
 }
 }
-
-// Register commands to raft server
-func registerCommands() {
-	raft.RegisterCommand(&JoinCommand{})
-	raft.RegisterCommand(&RemoveCommand{})
-	raft.RegisterCommand(&GetCommand{})
-	raft.RegisterCommand(&DeleteCommand{})
-	raft.RegisterCommand(&WatchCommand{})
-	raft.RegisterCommand(&TestAndSetCommand{})
-
-	raft.RegisterCommand(&CreateCommand{})
-	raft.RegisterCommand(&UpdateCommand{})
-}

+ 0 - 210
raft_stats.go

@@ -1,210 +0,0 @@
-package main
-
-import (
-	"math"
-	"sync"
-	"time"
-
-	"github.com/coreos/go-raft"
-)
-
-const (
-	queueCapacity = 200
-)
-
-// packageStats represent the stats we need for a package.
-// It has sending time and the size of the package.
-type packageStats struct {
-	sendingTime time.Time
-	size        int
-}
-
-// NewPackageStats creates a pacakgeStats and return the pointer to it.
-func NewPackageStats(now time.Time, size int) *packageStats {
-	return &packageStats{
-		sendingTime: now,
-		size:        size,
-	}
-}
-
-// Time return the sending time of the package.
-func (ps *packageStats) Time() time.Time {
-	return ps.sendingTime
-}
-
-type raftServerStats struct {
-	Name      string    `json:"name"`
-	State     string    `json:"state"`
-	StartTime time.Time `json:"startTime"`
-
-	LeaderInfo struct {
-		Name      string `json:"leader"`
-		Uptime    string `json:"uptime"`
-		startTime time.Time
-	} `json:"leaderInfo"`
-
-	RecvAppendRequestCnt uint64  `json:"recvAppendRequestCnt,"`
-	RecvingPkgRate       float64 `json:"recvPkgRate,omitempty"`
-	RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"`
-
-	SendAppendRequestCnt uint64  `json:"sendAppendRequestCnt"`
-	SendingPkgRate       float64 `json:"sendPkgRate,omitempty"`
-	SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
-
-	sendRateQueue *statsQueue
-	recvRateQueue *statsQueue
-}
-
-func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
-	ss.State = raft.Follower
-	if leaderName != ss.LeaderInfo.Name {
-		ss.LeaderInfo.Name = leaderName
-		ss.LeaderInfo.startTime = time.Now()
-	}
-
-	ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
-	ss.RecvAppendRequestCnt++
-}
-
-func (ss *raftServerStats) SendAppendReq(pkgSize int) {
-	now := time.Now()
-
-	if ss.State != raft.Leader {
-		ss.State = raft.Leader
-		ss.LeaderInfo.Name = ss.Name
-		ss.LeaderInfo.startTime = now
-	}
-
-	ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize))
-
-	ss.SendAppendRequestCnt++
-}
-
-type raftFollowersStats struct {
-	Leader    string                        `json:"leader"`
-	Followers map[string]*raftFollowerStats `json:"followers"`
-}
-
-type raftFollowerStats struct {
-	Latency struct {
-		Current           float64 `json:"current"`
-		Average           float64 `json:"average"`
-		averageSquare     float64
-		StandardDeviation float64 `json:"standardDeviation"`
-		Minimum           float64 `json:"minimum"`
-		Maximum           float64 `json:"maximum"`
-	} `json:"latency"`
-
-	Counts struct {
-		Fail    uint64 `json:"fail"`
-		Success uint64 `json:"success"`
-	} `json:"counts"`
-}
-
-// Succ function update the raftFollowerStats with a successful send
-func (ps *raftFollowerStats) Succ(d time.Duration) {
-	total := float64(ps.Counts.Success) * ps.Latency.Average
-	totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare
-
-	ps.Counts.Success++
-
-	ps.Latency.Current = float64(d) / (1000000.0)
-
-	if ps.Latency.Current > ps.Latency.Maximum {
-		ps.Latency.Maximum = ps.Latency.Current
-	}
-
-	if ps.Latency.Current < ps.Latency.Minimum {
-		ps.Latency.Minimum = ps.Latency.Current
-	}
-
-	ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success)
-	ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success)
-
-	// sdv = sqrt(avg(x^2) - avg(x)^2)
-	ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average)
-}
-
-// Fail function update the raftFollowerStats with a unsuccessful send
-func (ps *raftFollowerStats) Fail() {
-	ps.Counts.Fail++
-}
-
-type statsQueue struct {
-	items        [queueCapacity]*packageStats
-	size         int
-	front        int
-	back         int
-	totalPkgSize int
-	rwl          sync.RWMutex
-}
-
-func (q *statsQueue) Len() int {
-	return q.size
-}
-
-func (q *statsQueue) PkgSize() int {
-	return q.totalPkgSize
-}
-
-// FrontAndBack gets the front and back elements in the queue
-// We must grab front and back together with the protection of the lock
-func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) {
-	q.rwl.RLock()
-	defer q.rwl.RUnlock()
-	if q.size != 0 {
-		return q.items[q.front], q.items[q.back]
-	}
-	return nil, nil
-}
-
-// Insert function insert a packageStats into the queue and update the records
-func (q *statsQueue) Insert(p *packageStats) {
-	q.rwl.Lock()
-	defer q.rwl.Unlock()
-
-	q.back = (q.back + 1) % queueCapacity
-
-	if q.size == queueCapacity { //dequeue
-		q.totalPkgSize -= q.items[q.front].size
-		q.front = (q.back + 1) % queueCapacity
-	} else {
-		q.size++
-	}
-
-	q.items[q.back] = p
-	q.totalPkgSize += q.items[q.back].size
-
-}
-
-// Rate function returns the package rate and byte rate
-func (q *statsQueue) Rate() (float64, float64) {
-	front, back := q.frontAndBack()
-
-	if front == nil || back == nil {
-		return 0, 0
-	}
-
-	if time.Now().Sub(back.Time()) > time.Second {
-		q.Clear()
-		return 0, 0
-	}
-
-	sampleDuration := back.Time().Sub(front.Time())
-
-	pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second)
-
-	br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second)
-
-	return pr, br
-}
-
-// Clear function clear up the statsQueue
-func (q *statsQueue) Clear() {
-	q.rwl.Lock()
-	defer q.rwl.Unlock()
-	q.back = -1
-	q.front = 0
-	q.size = 0
-	q.totalPkgSize = 0
-}

+ 84 - 0
server/join_command.go

@@ -0,0 +1,84 @@
+package server
+
+import (
+	"encoding/binary"
+	"fmt"
+	"path"
+
+	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
+)
+
+func init() {
+	raft.RegisterCommand(&JoinCommand{})
+}
+
+// The JoinCommand adds a node to the cluster.
+type JoinCommand struct {
+	RaftVersion    string `json:"raftVersion"`
+	Name           string `json:"name"`
+	RaftURL        string `json:"raftURL"`
+	EtcdURL        string `json:"etcdURL"`
+	MaxClusterSize int    `json:"maxClusterSize"`
+}
+
+func NewJoinCommand(version, name, raftUrl, etcdUrl string, maxClusterSize int) *JoinCommand {
+	return &JoinCommand{
+		RaftVersion:    version,
+		Name:           name,
+		RaftURL:        raftUrl,
+		EtcdURL:        etcdUrl,
+		MaxClusterSize: maxClusterSize,
+	}
+}
+
+// The name of the join command in the log
+func (c *JoinCommand) CommandName() string {
+	return "etcd:join"
+}
+
+// Join a server to the cluster
+func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
+	s, _ := server.StateMachine().(*store.Store)
+	r, _ := server.Context().(*RaftServer)
+
+	// check if the join command is from a previous machine, who lost all its previous log.
+	e, _ := s.Get(path.Join("/_etcd/machines", c.Name), false, false, server.CommitIndex(), server.Term())
+
+	b := make([]byte, 8)
+	binary.PutUvarint(b, server.CommitIndex())
+
+	if e != nil {
+		return b, nil
+	}
+
+	// check machine number in the cluster
+	if s.MachineCount() == c.MaxClusterSize {
+		log.Debug("Reject join request from ", c.Name)
+		return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term())
+	}
+
+	addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)
+
+	// add peer in raft
+	err := server.AddPeer(c.Name, "")
+
+	// add machine in etcd storage
+	key := path.Join("_etcd/machines", c.Name)
+	value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
+	s.Create(key, value, false, false, store.Permanent, server.CommitIndex(), server.Term())
+
+	// add peer stats
+	if c.Name != r.Name() {
+		r.followersStats.Followers[c.Name] = &raftFollowerStats{}
+		r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
+	}
+
+	return b, err
+}
+
+func (c *JoinCommand) NodeName() string {
+	return c.Name
+}

+ 25 - 0
server/package_stats.go

@@ -0,0 +1,25 @@
+package server
+
+import (
+	"time"
+)
+
+// packageStats represent the stats we need for a package.
+// It has sending time and the size of the package.
+type packageStats struct {
+	sendingTime time.Time
+	size        int
+}
+
+// NewPackageStats creates a pacakgeStats and return the pointer to it.
+func NewPackageStats(now time.Time, size int) *packageStats {
+	return &packageStats{
+		sendingTime: now,
+		size:        size,
+	}
+}
+
+// Time return the sending time of the package.
+func (ps *packageStats) Time() time.Time {
+	return ps.sendingTime
+}

+ 56 - 0
server/raft_follower_stats.go

@@ -0,0 +1,56 @@
+package server
+
+import (
+	"math"
+	"time"
+)
+
+type raftFollowersStats struct {
+	Leader    string                        `json:"leader"`
+	Followers map[string]*raftFollowerStats `json:"followers"`
+}
+
+type raftFollowerStats struct {
+	Latency struct {
+		Current           float64 `json:"current"`
+		Average           float64 `json:"average"`
+		averageSquare     float64
+		StandardDeviation float64 `json:"standardDeviation"`
+		Minimum           float64 `json:"minimum"`
+		Maximum           float64 `json:"maximum"`
+	} `json:"latency"`
+
+	Counts struct {
+		Fail    uint64 `json:"fail"`
+		Success uint64 `json:"success"`
+	} `json:"counts"`
+}
+
+// Succ function update the raftFollowerStats with a successful send
+func (ps *raftFollowerStats) Succ(d time.Duration) {
+	total := float64(ps.Counts.Success) * ps.Latency.Average
+	totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare
+
+	ps.Counts.Success++
+
+	ps.Latency.Current = float64(d) / (1000000.0)
+
+	if ps.Latency.Current > ps.Latency.Maximum {
+		ps.Latency.Maximum = ps.Latency.Current
+	}
+
+	if ps.Latency.Current < ps.Latency.Minimum {
+		ps.Latency.Minimum = ps.Latency.Current
+	}
+
+	ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success)
+	ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success)
+
+	// sdv = sqrt(avg(x^2) - avg(x)^2)
+	ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average)
+}
+
+// Fail function update the raftFollowerStats with a unsuccessful send
+func (ps *raftFollowerStats) Fail() {
+	ps.Counts.Fail++
+}

+ 55 - 0
server/raft_server_stats.go

@@ -0,0 +1,55 @@
+package server
+
+import (
+	"time"
+
+	"github.com/coreos/go-raft"
+)
+
+type raftServerStats struct {
+	Name      string    `json:"name"`
+	State     string    `json:"state"`
+	StartTime time.Time `json:"startTime"`
+
+	LeaderInfo struct {
+		Name      string `json:"leader"`
+		Uptime    string `json:"uptime"`
+		startTime time.Time
+	} `json:"leaderInfo"`
+
+	RecvAppendRequestCnt uint64  `json:"recvAppendRequestCnt,"`
+	RecvingPkgRate       float64 `json:"recvPkgRate,omitempty"`
+	RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"`
+
+	SendAppendRequestCnt uint64  `json:"sendAppendRequestCnt"`
+	SendingPkgRate       float64 `json:"sendPkgRate,omitempty"`
+	SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
+
+	sendRateQueue *statsQueue
+	recvRateQueue *statsQueue
+}
+
+func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
+	ss.State = raft.Follower
+	if leaderName != ss.LeaderInfo.Name {
+		ss.LeaderInfo.Name = leaderName
+		ss.LeaderInfo.startTime = time.Now()
+	}
+
+	ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
+	ss.RecvAppendRequestCnt++
+}
+
+func (ss *raftServerStats) SendAppendReq(pkgSize int) {
+	now := time.Now()
+
+	if ss.State != raft.Leader {
+		ss.State = raft.Leader
+		ss.LeaderInfo.Name = ss.Name
+		ss.LeaderInfo.startTime = now
+	}
+
+	ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize))
+
+	ss.SendAppendRequestCnt++
+}

+ 68 - 0
server/remove_command.go

@@ -0,0 +1,68 @@
+package server
+
+import (
+	"encoding/binary"
+	"path"
+
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
+)
+
+func init() {
+	raft.RegisterCommand(&RemoveCommand{})
+}
+
+// The RemoveCommand removes a server from the cluster.
+type RemoveCommand struct {
+	Name string `json:"name"`
+}
+
+// The name of the remove command in the log
+func (c *RemoveCommand) CommandName() string {
+	return "etcd:remove"
+}
+
+// Remove a server from the cluster
+func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
+	s, _ := server.StateMachine().(*store.Store)
+	r, _ := server.Context().(*RaftServer)
+
+	// remove machine in etcd storage
+	key := path.Join("_etcd/machines", c.Name)
+
+	_, err := s.Delete(key, false, server.CommitIndex(), server.Term())
+	// delete from stats
+	delete(r.followersStats.Followers, c.Name)
+
+	if err != nil {
+		return []byte{0}, err
+	}
+
+	// remove peer in raft
+	err = server.RemovePeer(c.Name)
+
+	if err != nil {
+		return []byte{0}, err
+	}
+
+	if c.Name == server.Name() {
+		// the removed node is this node
+
+		// if the node is not replaying the previous logs
+		// and the node has sent out a join request in this
+		// start. It is sure that this node received a new remove
+		// command and need to be removed
+		if server.CommitIndex() > r.joinIndex && r.joinIndex != 0 {
+			debugf("server [%s] is removed", server.Name())
+			os.Exit(0)
+		} else {
+			// else ignore remove
+			debugf("ignore previous remove command.")
+		}
+	}
+
+	b := make([]byte, 8)
+	binary.PutUvarint(b, server.CommitIndex())
+
+	return b, err
+}

+ 7 - 9
server/server.go

@@ -1,16 +1,19 @@
 package server
 package server
 
 
 import (
 import (
-	"github.com/gorilla/mux"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
+
+	"github.com/coreos/etcd/command"
+	"github.com/coreos/go-raft"
+	"github.com/gorilla/mux"
 )
 )
 
 
 // The Server provides an HTTP interface to the underlying store.
 // The Server provides an HTTP interface to the underlying store.
 type Server interface {
 type Server interface {
-    CommitIndex() uint64 
-    Term() uint64 
-    Dispatch(Command, http.ResponseWriter, *http.Request)
+	CommitIndex() uint64
+	Term() uint64
+	Dispatch(command.Command, http.ResponseWriter, *http.Request)
 }
 }
 
 
 // This is the default implementation of the Server interface.
 // This is the default implementation of the Server interface.
@@ -55,11 +58,6 @@ func (s *server) Term() uint64 {
 	return c.raftServer.Term()
 	return c.raftServer.Term()
 }
 }
 
 
-// Executes a command against the Raft server.
-func (s *server) Do(c Command, localOnly bool) (interface{}, error) {
-	return c.raftServer.Do(s.RaftServer().Server)
-}
-
 func (s *server) installV1() {
 func (s *server) installV1() {
 	s.handleFunc("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET")
 	s.handleFunc("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET")
 	s.handleFunc("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT")
 	s.handleFunc("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT")

+ 88 - 0
server/stats_queue.go

@@ -0,0 +1,88 @@
+package server
+
+import (
+	"sync"
+)
+
+const (
+	queueCapacity = 200
+)
+
+type statsQueue struct {
+	items        [queueCapacity]*packageStats
+	size         int
+	front        int
+	back         int
+	totalPkgSize int
+	rwl          sync.RWMutex
+}
+
+func (q *statsQueue) Len() int {
+	return q.size
+}
+
+func (q *statsQueue) PkgSize() int {
+	return q.totalPkgSize
+}
+
+// FrontAndBack gets the front and back elements in the queue
+// We must grab front and back together with the protection of the lock
+func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) {
+	q.rwl.RLock()
+	defer q.rwl.RUnlock()
+	if q.size != 0 {
+		return q.items[q.front], q.items[q.back]
+	}
+	return nil, nil
+}
+
+// Insert function insert a packageStats into the queue and update the records
+func (q *statsQueue) Insert(p *packageStats) {
+	q.rwl.Lock()
+	defer q.rwl.Unlock()
+
+	q.back = (q.back + 1) % queueCapacity
+
+	if q.size == queueCapacity { //dequeue
+		q.totalPkgSize -= q.items[q.front].size
+		q.front = (q.back + 1) % queueCapacity
+	} else {
+		q.size++
+	}
+
+	q.items[q.back] = p
+	q.totalPkgSize += q.items[q.back].size
+
+}
+
+// Rate function returns the package rate and byte rate
+func (q *statsQueue) Rate() (float64, float64) {
+	front, back := q.frontAndBack()
+
+	if front == nil || back == nil {
+		return 0, 0
+	}
+
+	if time.Now().Sub(back.Time()) > time.Second {
+		q.Clear()
+		return 0, 0
+	}
+
+	sampleDuration := back.Time().Sub(front.Time())
+
+	pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second)
+
+	br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second)
+
+	return pr, br
+}
+
+// Clear function clear up the statsQueue
+func (q *statsQueue) Clear() {
+	q.rwl.Lock()
+	defer q.rwl.Unlock()
+	q.back = -1
+	q.front = 0
+	q.size = 0
+	q.totalPkgSize = 0
+}

+ 11 - 0
server/tls_config.go

@@ -0,0 +1,11 @@
+package server
+
+import (
+	"crypto/tls"
+)
+
+type TLSConfig struct {
+	Scheme string
+	Server tls.Config
+	Client tls.Config
+}

+ 7 - 0
server/tls_info.go

@@ -0,0 +1,7 @@
+package server
+
+type TLSInfo struct {
+	CertFile string `json:"CertFile"`
+	KeyFile  string `json:"KeyFile"`
+	CAFile   string `json:"CAFile"`
+}

+ 5 - 5
server/v1/delete_key_handler.go

@@ -1,15 +1,15 @@
 package v1
 package v1
 
 
 import (
 import (
-    "encoding/json"
-    "github.com/coreos/etcd/store"
-    "net/http"
+	"encoding/json"
+	"github.com/coreos/etcd/store"
+	"net/http"
 )
 )
 
 
 // Removes a key from the store.
 // Removes a key from the store.
 func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
-    vars := mux.Vars(req)
-    key := "/" + vars["key"]
+	vars := mux.Vars(req)
+	key := "/" + vars["key"]
 	command := &DeleteCommand{Key: key}
 	command := &DeleteCommand{Key: key}
 	return s.Dispatch(command, w, req)
 	return s.Dispatch(command, w, req)
 }
 }

+ 33 - 33
server/v1/v1.go

@@ -7,44 +7,44 @@ import (
 
 
 // The Server interface provides all the methods required for the v1 API.
 // The Server interface provides all the methods required for the v1 API.
 type Server interface {
 type Server interface {
-    CommitIndex() uint64 
-    Term() uint64 
-    Dispatch(http.ResponseWriter, *http.Request, Command) 
+	CommitIndex() uint64
+	Term() uint64
+	Dispatch(http.ResponseWriter, *http.Request, Command)
 }
 }
 
 
 // Converts an event object into a response object.
 // Converts an event object into a response object.
 func eventToResponse(event *store.Event) interface{} {
 func eventToResponse(event *store.Event) interface{} {
-    if !event.Dir {
-        response := &store.Response{
-            Action:     event.Action,
-            Key:        event.Key,
-            Value:      event.Value,
-            PrevValue:  event.PrevValue,
-            Index:      event.Index,
-            TTL:        event.TTL,
-            Expiration: event.Expiration,
-        }
+	if !event.Dir {
+		response := &store.Response{
+			Action:     event.Action,
+			Key:        event.Key,
+			Value:      event.Value,
+			PrevValue:  event.PrevValue,
+			Index:      event.Index,
+			TTL:        event.TTL,
+			Expiration: event.Expiration,
+		}
 
 
-        if response.Action == store.Create || response.Action == store.Update {
-            response.Action = "set"
-            if response.PrevValue == "" {
-                response.NewKey = true
-            }
-        }
+		if response.Action == store.Create || response.Action == store.Update {
+			response.Action = "set"
+			if response.PrevValue == "" {
+				response.NewKey = true
+			}
+		}
 
 
-        return response
-    } else {
-        responses := make([]*store.Response, len(event.KVPairs))
+		return response
+	} else {
+		responses := make([]*store.Response, len(event.KVPairs))
 
 
-        for i, kv := range event.KVPairs {
-            responses[i] = &store.Response{
-                Action: event.Action,
-                Key:    kv.Key,
-                Value:  kv.Value,
-                Dir:    kv.Dir,
-                Index:  event.Index,
-            }
-        }
-        return responses
-    }
+		for i, kv := range event.KVPairs {
+			responses[i] = &store.Response{
+				Action: event.Action,
+				Key:    kv.Key,
+				Value:  kv.Value,
+				Dir:    kv.Dir,
+				Index:  event.Index,
+			}
+		}
+		return responses
+	}
 }
 }

+ 230 - 230
server/v2/handlers.go

@@ -1,15 +1,15 @@
 package main
 package main
 
 
 import (
 import (
-    "encoding/json"
-    "fmt"
-    "net/http"
-    "strconv"
-    "strings"
-
-    etcdErr "github.com/coreos/etcd/error"
-    "github.com/coreos/etcd/store"
-    "github.com/coreos/go-raft"
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"strconv"
+	"strings"
+
+	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
 )
 )
 
 
 //-------------------------------------------------------------------
 //-------------------------------------------------------------------
@@ -17,22 +17,22 @@ import (
 //-------------------------------------------------------------------
 //-------------------------------------------------------------------
 
 
 func NewEtcdMuxer() *http.ServeMux {
 func NewEtcdMuxer() *http.ServeMux {
-    // external commands
-    router := mux.NewRouter()
-    etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer))
-    etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler))
-    etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler))
-    etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler))
-    etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler))
-    etcdMux.HandleFunc("/test/", TestHttpHandler)
-
-    // backward support
-    etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1))
-    etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler))
-    etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler))
-    etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler))
-
-    return etcdMux
+	// external commands
+	router := mux.NewRouter()
+	etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer))
+	etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler))
+	etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler))
+	etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler))
+	etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler))
+	etcdMux.HandleFunc("/test/", TestHttpHandler)
+
+	// backward support
+	etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1))
+	etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler))
+	etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler))
+	etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler))
+
+	return etcdMux
 }
 }
 
 
 type errorHandler func(http.ResponseWriter, *http.Request) error
 type errorHandler func(http.ResponseWriter, *http.Request) error
@@ -41,50 +41,50 @@ type errorHandler func(http.ResponseWriter, *http.Request) error
 // provided allowed origins and sets the Access-Control-Allow-Origin header if
 // provided allowed origins and sets the Access-Control-Allow-Origin header if
 // there is a match.
 // there is a match.
 func addCorsHeader(w http.ResponseWriter, r *http.Request) {
 func addCorsHeader(w http.ResponseWriter, r *http.Request) {
-    val, ok := corsList["*"]
-    if val && ok {
-        w.Header().Add("Access-Control-Allow-Origin", "*")
-        return
-    }
-
-    requestOrigin := r.Header.Get("Origin")
-    val, ok = corsList[requestOrigin]
-    if val && ok {
-        w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
-        return
-    }
+	val, ok := corsList["*"]
+	if val && ok {
+		w.Header().Add("Access-Control-Allow-Origin", "*")
+		return
+	}
+
+	requestOrigin := r.Header.Get("Origin")
+	val, ok = corsList[requestOrigin]
+	if val && ok {
+		w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
+		return
+	}
 }
 }
 
 
 func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-    addCorsHeader(w, r)
-    if e := fn(w, r); e != nil {
-        if etcdErr, ok := e.(*etcdErr.Error); ok {
-            debug("Return error: ", (*etcdErr).Error())
-            etcdErr.Write(w)
-        } else {
-            http.Error(w, e.Error(), http.StatusInternalServerError)
-        }
-    }
+	addCorsHeader(w, r)
+	if e := fn(w, r); e != nil {
+		if etcdErr, ok := e.(*etcdErr.Error); ok {
+			debug("Return error: ", (*etcdErr).Error())
+			etcdErr.Write(w)
+		} else {
+			http.Error(w, e.Error(), http.StatusInternalServerError)
+		}
+	}
 }
 }
 
 
 // Multiplex GET/POST/DELETE request to corresponding handlers
 // Multiplex GET/POST/DELETE request to corresponding handlers
 func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error {
 
 
-    switch req.Method {
-    case "GET":
-        return e.GetHttpHandler(w, req)
-    case "POST":
-        return e.CreateHttpHandler(w, req)
-    case "PUT":
-        return e.UpdateHttpHandler(w, req)
-    case "DELETE":
-        return e.DeleteHttpHandler(w, req)
-    default:
-        w.WriteHeader(http.StatusMethodNotAllowed)
-        return nil
-    }
-
-    return nil
+	switch req.Method {
+	case "GET":
+		return e.GetHttpHandler(w, req)
+	case "POST":
+		return e.CreateHttpHandler(w, req)
+	case "PUT":
+		return e.UpdateHttpHandler(w, req)
+	case "DELETE":
+		return e.DeleteHttpHandler(w, req)
+	default:
+		w.WriteHeader(http.StatusMethodNotAllowed)
+		return nil
+	}
+
+	return nil
 }
 }
 
 
 //--------------------------------------
 //--------------------------------------
@@ -93,111 +93,111 @@ func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error
 //--------------------------------------
 //--------------------------------------
 
 
 func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
-    key := getNodePath(req.URL.Path)
+	key := getNodePath(req.URL.Path)
 
 
-    debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
+	debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
 
-    value := req.FormValue("value")
+	value := req.FormValue("value")
 
 
-    expireTime, err := durationToExpireTime(req.FormValue("ttl"))
+	expireTime, err := durationToExpireTime(req.FormValue("ttl"))
 
 
-    if err != nil {
-        return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
-    }
+	if err != nil {
+		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
+	}
 
 
-    command := &CreateCommand{
-        Key:        key,
-        Value:      value,
-        ExpireTime: expireTime,
-    }
+	command := &CreateCommand{
+		Key:        key,
+		Value:      value,
+		ExpireTime: expireTime,
+	}
 
 
-    if req.FormValue("incremental") == "true" {
-        command.IncrementalSuffix = true
-    }
+	if req.FormValue("incremental") == "true" {
+		command.IncrementalSuffix = true
+	}
 
 
-    return e.dispatchEtcdCommand(command, w, req)
+	return e.dispatchEtcdCommand(command, w, req)
 
 
 }
 }
 
 
 func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
-    key := getNodePath(req.URL.Path)
+	key := getNodePath(req.URL.Path)
 
 
-    debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
+	debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
 
-    req.ParseForm()
+	req.ParseForm()
 
 
-    value := req.Form.Get("value")
+	value := req.Form.Get("value")
 
 
-    expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
+	expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
 
 
-    if err != nil {
-        return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
-    }
+	if err != nil {
+		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
+	}
 
 
-    // update should give at least one option
-    if value == "" && expireTime.Sub(store.Permanent) == 0 {
-        return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
-    }
+	// update should give at least one option
+	if value == "" && expireTime.Sub(store.Permanent) == 0 {
+		return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
+	}
 
 
-    prevValue, valueOk := req.Form["prevValue"]
+	prevValue, valueOk := req.Form["prevValue"]
 
 
-    prevIndexStr, indexOk := req.Form["prevIndex"]
+	prevIndexStr, indexOk := req.Form["prevIndex"]
 
 
-    if !valueOk && !indexOk { // update without test
-        command := &UpdateCommand{
-            Key:        key,
-            Value:      value,
-            ExpireTime: expireTime,
-        }
+	if !valueOk && !indexOk { // update without test
+		command := &UpdateCommand{
+			Key:        key,
+			Value:      value,
+			ExpireTime: expireTime,
+		}
 
 
-        return e.dispatchEtcdCommand(command, w, req)
+		return e.dispatchEtcdCommand(command, w, req)
 
 
-    } else { // update with test
-        var prevIndex uint64
+	} else { // update with test
+		var prevIndex uint64
 
 
-        if indexOk {
-            prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
+		if indexOk {
+			prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
 
 
-            // bad previous index
-            if err != nil {
-                return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
-            }
-        } else {
-            prevIndex = 0
-        }
+			// bad previous index
+			if err != nil {
+				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
+			}
+		} else {
+			prevIndex = 0
+		}
 
 
-        command := &TestAndSetCommand{
-            Key:       key,
-            Value:     value,
-            PrevValue: prevValue[0],
-            PrevIndex: prevIndex,
-        }
+		command := &TestAndSetCommand{
+			Key:       key,
+			Value:     value,
+			PrevValue: prevValue[0],
+			PrevIndex: prevIndex,
+		}
 
 
-        return e.dispatchEtcdCommand(command, w, req)
-    }
+		return e.dispatchEtcdCommand(command, w, req)
+	}
 
 
 }
 }
 
 
 // Delete Handler
 // Delete Handler
 func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
-    key := getNodePath(req.URL.Path)
+	key := getNodePath(req.URL.Path)
 
 
-    debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
+	debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
 
-    command := &DeleteCommand{
-        Key: key,
-    }
+	command := &DeleteCommand{
+		Key: key,
+	}
 
 
-    if req.FormValue("recursive") == "true" {
-        command.Recursive = true
-    }
+	if req.FormValue("recursive") == "true" {
+		command.Recursive = true
+	}
 
 
-    return e.dispatchEtcdCommand(command, w, req)
+	return e.dispatchEtcdCommand(command, w, req)
 }
 }
 
 
 // Dispatch the command to leader
 // Dispatch the command to leader
 func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error {
-    return e.raftServer.dispatch(c, w, req, nameToEtcdURL)
+	return e.raftServer.dispatch(c, w, req, nameToEtcdURL)
 }
 }
 
 
 //--------------------------------------
 //--------------------------------------
@@ -208,157 +208,157 @@ func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *
 
 
 // Handler to return the current leader's raft address
 // Handler to return the current leader's raft address
 func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
-    r := e.raftServer
+	r := e.raftServer
 
 
-    leader := r.Leader()
+	leader := r.Leader()
 
 
-    if leader != "" {
-        w.WriteHeader(http.StatusOK)
-        raftURL, _ := nameToRaftURL(leader)
-        w.Write([]byte(raftURL))
+	if leader != "" {
+		w.WriteHeader(http.StatusOK)
+		raftURL, _ := nameToRaftURL(leader)
+		w.Write([]byte(raftURL))
 
 
-        return nil
-    } else {
-        return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
-    }
+		return nil
+	} else {
+		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
+	}
 }
 }
 
 
 // Handler to return all the known machines in the current cluster
 // Handler to return all the known machines in the current cluster
 func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
-    machines := e.raftServer.getMachines(nameToEtcdURL)
+	machines := e.raftServer.getMachines(nameToEtcdURL)
 
 
-    w.WriteHeader(http.StatusOK)
-    w.Write([]byte(strings.Join(machines, ", ")))
+	w.WriteHeader(http.StatusOK)
+	w.Write([]byte(strings.Join(machines, ", ")))
 
 
-    return nil
+	return nil
 }
 }
 
 
 // Handler to return the current version of etcd
 // Handler to return the current version of etcd
 func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
-    w.WriteHeader(http.StatusOK)
-    fmt.Fprintf(w, "etcd %s", releaseVersion)
+	w.WriteHeader(http.StatusOK)
+	fmt.Fprintf(w, "etcd %s", releaseVersion)
 
 
-    return nil
+	return nil
 }
 }
 
 
 // Handler to return the basic stats of etcd
 // Handler to return the basic stats of etcd
 func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
-    option := req.URL.Path[len("/v1/stats/"):]
-    w.WriteHeader(http.StatusOK)
-
-    r := e.raftServer
-
-    switch option {
-    case "self":
-        w.Write(r.Stats())
-    case "leader":
-        if r.State() == raft.Leader {
-            w.Write(r.PeerStats())
-        } else {
-            leader := r.Leader()
-            // current no leader
-            if leader == "" {
-                return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
-            }
-            hostname, _ := nameToEtcdURL(leader)
-            redirect(hostname, w, req)
-        }
-    case "store":
-        w.Write(etcdStore.JsonStats())
-    }
-
-    return nil
+	option := req.URL.Path[len("/v1/stats/"):]
+	w.WriteHeader(http.StatusOK)
+
+	r := e.raftServer
+
+	switch option {
+	case "self":
+		w.Write(r.Stats())
+	case "leader":
+		if r.State() == raft.Leader {
+			w.Write(r.PeerStats())
+		} else {
+			leader := r.Leader()
+			// current no leader
+			if leader == "" {
+				return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
+			}
+			hostname, _ := nameToEtcdURL(leader)
+			redirect(hostname, w, req)
+		}
+	case "store":
+		w.Write(etcdStore.JsonStats())
+	}
+
+	return nil
 }
 }
 
 
 func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
-    var err error
-    var event interface{}
+	var err error
+	var event interface{}
 
 
-    r := e.raftServer
+	r := e.raftServer
 
 
-    debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
+	debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
 
-    if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
-        // help client to redirect the request to the current leader
-        leader := r.Leader()
-        hostname, _ := nameToEtcdURL(leader)
-        redirect(hostname, w, req)
-        return nil
-    }
+	if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
+		// help client to redirect the request to the current leader
+		leader := r.Leader()
+		hostname, _ := nameToEtcdURL(leader)
+		redirect(hostname, w, req)
+		return nil
+	}
 
 
-    key := getNodePath(req.URL.Path)
+	key := getNodePath(req.URL.Path)
 
 
-    recursive := req.FormValue("recursive")
+	recursive := req.FormValue("recursive")
 
 
-    if req.FormValue("wait") == "true" { // watch
-        command := &WatchCommand{
-            Key: key,
-        }
+	if req.FormValue("wait") == "true" { // watch
+		command := &WatchCommand{
+			Key: key,
+		}
 
 
-        if recursive == "true" {
-            command.Recursive = true
-        }
+		if recursive == "true" {
+			command.Recursive = true
+		}
 
 
-        indexStr := req.FormValue("wait_index")
-        if indexStr != "" {
-            sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
+		indexStr := req.FormValue("wait_index")
+		if indexStr != "" {
+			sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
 
 
-            if err != nil {
-                return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
-            }
+			if err != nil {
+				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
+			}
 
 
-            command.SinceIndex = sinceIndex
-        }
+			command.SinceIndex = sinceIndex
+		}
 
 
-        event, err = command.Apply(r.Server)
+		event, err = command.Apply(r.Server)
 
 
-    } else { //get
+	} else { //get
 
 
-        command := &GetCommand{
-            Key: key,
-        }
+		command := &GetCommand{
+			Key: key,
+		}
 
 
-        sorted := req.FormValue("sorted")
-        if sorted == "true" {
-            command.Sorted = true
-        }
+		sorted := req.FormValue("sorted")
+		if sorted == "true" {
+			command.Sorted = true
+		}
 
 
-        if recursive == "true" {
-            command.Recursive = true
-        }
+		if recursive == "true" {
+			command.Recursive = true
+		}
 
 
-        event, err = command.Apply(r.Server)
-    }
+		event, err = command.Apply(r.Server)
+	}
 
 
-    if err != nil {
-        return err
+	if err != nil {
+		return err
 
 
-    } else {
-        event, _ := event.(*store.Event)
-        bytes, _ := json.Marshal(event)
+	} else {
+		event, _ := event.(*store.Event)
+		bytes, _ := json.Marshal(event)
 
 
-        w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
-        w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
-        w.WriteHeader(http.StatusOK)
+		w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
+		w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
+		w.WriteHeader(http.StatusOK)
 
 
-        w.Write(bytes)
+		w.Write(bytes)
 
 
-        return nil
-    }
+		return nil
+	}
 
 
 }
 }
 
 
 // TestHandler
 // TestHandler
 func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
 func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
-    testType := req.URL.Path[len("/test/"):]
+	testType := req.URL.Path[len("/test/"):]
 
 
-    if testType == "speed" {
-        directSet()
-        w.WriteHeader(http.StatusOK)
-        w.Write([]byte("speed test success"))
+	if testType == "speed" {
+		directSet()
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte("speed test success"))
 
 
-        return
-    }
+		return
+	}
 
 
-    w.WriteHeader(http.StatusBadRequest)
+	w.WriteHeader(http.StatusBadRequest)
 }
 }

+ 10 - 0
store/store.go

@@ -401,6 +401,16 @@ func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
 	return n, nil
 	return n, nil
 }
 }
 
 
+// Returns the number of machines in the cluster.
+func (s *Store) MachineCount() int {
+	e, err := s.Get("/_etcd/machines", false, false, 0, 0)
+	if err != nil {
+		return 0
+	}
+
+	return len(e.KVPairs)
+}
+
 // Save function saves the static state of the store system.
 // Save function saves the static state of the store system.
 // Save function will not be able to save the state of watchers.
 // Save function will not be able to save the state of watchers.
 // Save function will not save the parent field of the node. Or there will
 // Save function will not save the parent field of the node. Or there will

+ 0 - 40
util.go

@@ -15,7 +15,6 @@ import (
 
 
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
-	"github.com/coreos/go-log/log"
 	"github.com/coreos/go-raft"
 	"github.com/coreos/go-raft"
 )
 )
 
 
@@ -172,45 +171,6 @@ func getNodePath(urlPath string) string {
 	return urlPath[pathPrefixLen:]
 	return urlPath[pathPrefixLen:]
 }
 }
 
 
-//--------------------------------------
-// Log
-//--------------------------------------
-
-var logger *log.Logger = log.New("etcd", false,
-	log.CombinedSink(os.Stdout, "[%s] %s %-9s | %s\n", []string{"prefix", "time", "priority", "message"}))
-
-func infof(format string, v ...interface{}) {
-	logger.Infof(format, v...)
-}
-
-func debugf(format string, v ...interface{}) {
-	if verbose {
-		logger.Debugf(format, v...)
-	}
-}
-
-func debug(v ...interface{}) {
-	if verbose {
-		logger.Debug(v...)
-	}
-}
-
-func warnf(format string, v ...interface{}) {
-	logger.Warningf(format, v...)
-}
-
-func warn(v ...interface{}) {
-	logger.Warning(v...)
-}
-
-func fatalf(format string, v ...interface{}) {
-	logger.Fatalf(format, v...)
-}
-
-func fatal(v ...interface{}) {
-	logger.Fatalln(v...)
-}
-
 //--------------------------------------
 //--------------------------------------
 // CPU profile
 // CPU profile
 //--------------------------------------
 //--------------------------------------