Browse Source

Merge pull request #47 from xiangli-cmu/master

support version and basic stats
Xiang Li 12 years ago
parent
commit
0310268d27
8 changed files with 95 additions and 32 deletions
  1. 13 1
      client_handlers.go
  2. 8 5
      command.go
  3. 1 0
      error.go
  4. 2 0
      etcd.go
  5. 11 19
      machines.go
  6. 25 0
      store/stats.go
  7. 33 7
      store/store.go
  8. 2 0
      version.go

+ 13 - 1
client_handlers.go

@@ -131,7 +131,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 
 			if body == nil {
 				(*w).WriteHeader(http.StatusNotFound)
-				(*w).Write(newJsonError(100, err.Error()))
+				(*w).Write(newJsonError(300, "Empty result from raft"))
 			} else {
 				body, ok := body.([]byte)
 				// this should not happen
@@ -225,6 +225,18 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 }
 
+// Handler to return the current version of etcd
+func VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
+	w.WriteHeader(http.StatusOK)
+	w.Write([]byte(releaseVersion))
+}
+
+// Handler to return the basic stats of etcd
+func StatsHttpHandler(w http.ResponseWriter, req *http.Request) {
+	w.WriteHeader(http.StatusOK)
+	w.Write(etcdStore.Stats())
+}
+
 // Get Handler
 func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]

+ 8 - 5
command.go

@@ -120,6 +120,13 @@ func (c *JoinCommand) CommandName() string {
 // Join a server to the cluster
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 
+	// check if the join command is from a previous machine, who lost all its previous log.
+	response, _ := etcdStore.RawGet(path.Join("_etcd/machines", c.Name))
+
+	if response != nil {
+		return []byte("join success"), nil
+	}
+
 	// check machine number in the cluster
 	num := machineNum()
 	if num == maxClusterSize {
@@ -129,12 +136,8 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	// add peer in raft
 	err := raftServer.AddPeer(c.Name)
 
-	// add machine in etcd
-	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
-
 	// add machine in etcd storage
-	nodeName := fmt.Sprintf("%s%d", "node", raftServer.CommitIndex())
-	key := path.Join("_etcd/machines", nodeName)
+	key := path.Join("_etcd/machines", c.Name)
 	value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort)
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
 

+ 1 - 0
error.go

@@ -20,6 +20,7 @@ func init() {
 	errors[201] = "PrevValue is Required in POST form"
 	errors[202] = "The given TTL in POST form is not a number"
 	errors[203] = "The given index in POST form is not a number"
+	
 	// raft related errors
 	errors[300] = "Raft Internal Error"
 	errors[301] = "During Leader Election"

+ 2 - 0
etcd.go

@@ -434,6 +434,8 @@ func startClientTransport(port int, st int) {
 	http.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
 	http.HandleFunc("/leader", LeaderHttpHandler)
 	http.HandleFunc("/machines", MachinesHttpHandler)
+	http.HandleFunc("/", VersionHttpHandler)
+	http.HandleFunc("/stats", StatsHttpHandler)
 
 	switch st {
 

+ 11 - 19
machines.go

@@ -2,34 +2,26 @@ package main
 
 import (
 	"fmt"
+	"path"
+	"strings"
 )
 
-type machine struct {
-	hostname   string
-	raftPort   int
-	clientPort int
-}
-
-var machinesMap = map[string]machine{}
-
-func addMachine(name string, hostname string, raftPort int, clientPort int) {
-
-	machinesMap[name] = machine{hostname, raftPort, clientPort}
+func getClientAddr(name string) (string, bool) {
+	response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
 
-}
+	values := strings.Split(response[0].Value, ",")
 
-func getClientAddr(name string) (string, bool) {
-	machine, ok := machinesMap[name]
-	if !ok {
-		return "", false
-	}
+	hostname := values[0]
+	clientPort := values[2]
 
-	addr := fmt.Sprintf("%s:%v", machine.hostname, machine.clientPort)
+	addr := fmt.Sprintf("%s:%s", hostname, clientPort)
 
 	return addr, true
 }
 
 // machineNum returns the number of machines in the cluster
 func machineNum() int {
-	return len(machinesMap)
+	response, _ := etcdStore.RawGet("_etcd/machines")
+
+	return len(response)
 }

+ 25 - 0
store/stats.go

@@ -0,0 +1,25 @@
+package store
+
+import (
+	"encoding/json"
+)
+
+type EtcdStats struct {
+	// Number of get requests
+	Gets uint64 `json:"gets"`
+
+	// Number of sets requests
+	Sets uint64 `json:"sets"`
+
+	// Number of delete requests
+	Deletes uint64 `json:"deletes"`
+
+	// Number of testAndSet requests
+	TestAndSets uint64 `json:"testAndSets"`
+}
+
+// Stats returns the basic statistics information of etcd storage
+func (s *Store) Stats() []byte {
+	b, _ := json.Marshal(s.BasicStats)
+	return b
+}

+ 33 - 7
store/store.go

@@ -42,6 +42,9 @@ type Store struct {
 
 	// Current index of the raft machine
 	Index uint64
+
+	// Basic statistics information of etcd storage
+	BasicStats EtcdStats
 }
 
 // A Node represents a Value in the Key-Value pair in the store
@@ -139,6 +142,9 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
 	//Update index
 	s.Index = index
 
+	//Update stats
+	s.BasicStats.Sets++
+
 	key = path.Clean("/" + key)
 
 	isExpire := !expireTime.Equal(PERMANENT)
@@ -284,13 +290,29 @@ func (s *Store) internalGet(key string) *Response {
 // If key is a file return the file
 // If key is a directory reuturn an array of files
 func (s *Store) Get(key string) ([]byte, error) {
+	resps, err := s.RawGet(key)
+
+	if err != nil {
+		return nil, err
+	}
+
+	if len(resps) == 1 {
+		return json.Marshal(resps[0])
+	}
+
+	return json.Marshal(resps)
+}
+
+func (s *Store) RawGet(key string) ([]*Response, error) {
+	// Update stats
+	s.BasicStats.Gets++
 
 	key = path.Clean("/" + key)
 
 	nodes, keys, dirs, ok := s.Tree.list(key)
 
 	if ok {
-		resps := make([]Response, len(nodes))
+		resps := make([]*Response, len(nodes))
 		for i := 0; i < len(nodes); i++ {
 
 			var TTL int64
@@ -298,7 +320,7 @@ func (s *Store) Get(key string) ([]byte, error) {
 
 			isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
 
-			resps[i] = Response{
+			resps[i] = &Response{
 				Action: "GET",
 				Index:  s.Index,
 				Key:    path.Join(key, keys[i]),
@@ -318,10 +340,8 @@ func (s *Store) Get(key string) ([]byte, error) {
 			}
 
 		}
-		if len(resps) == 1 {
-			return json.Marshal(resps[0])
-		}
-		return json.Marshal(resps)
+
+		return resps, nil
 	}
 
 	err := NotFoundError(key)
@@ -331,9 +351,12 @@ func (s *Store) Get(key string) ([]byte, error) {
 // Delete the key
 func (s *Store) Delete(key string, index uint64) ([]byte, error) {
 
+	// Update stats
+	s.BasicStats.Deletes++
+
 	key = path.Clean("/" + key)
 
-	//Update index
+	// Update index
 	s.Index = index
 
 	node, ok := s.Tree.get(key)
@@ -381,6 +404,9 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) {
 
 // Set the value of the key to the value if the given prevValue is equal to the value of the key
 func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
+	// Update stats
+	s.BasicStats.TestAndSets++
+
 	resp := s.internalGet(key)
 
 	if resp == nil {

+ 2 - 0
version.go

@@ -1,3 +1,5 @@
 package main
 
 var version = "v1"
+
+var releaseVersion = "etcd pre-0.1"