Browse Source

merge errHandling branch

Xiang Li 12 years ago
parent
commit
33872501cf
5 changed files with 156 additions and 48 deletions
  1. 50 11
      client_handlers.go
  2. 2 2
      command.go
  3. 2 1
      etcd.go
  4. 23 0
      store/error.go
  5. 79 34
      store/store.go

+ 50 - 11
client_handlers.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"github.com/coreos/etcd/store"
 	"net/http"
 	"strconv"
 	"time"
@@ -40,6 +41,13 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	command.Key = key
 
 	command.Value = req.FormValue("value")
+
+	if len(command.Value) == 0 {
+		(*w).WriteHeader(http.StatusBadRequest)
+		(*w).Write([]byte("Set: Value Required\n"))
+		return
+	}
+
 	strDuration := req.FormValue("ttl")
 
 	var err error
@@ -66,6 +74,19 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 	command.PrevValue = req.FormValue("prevValue")
 	command.Value = req.FormValue("value")
+
+	if len(command.Value) == 0 {
+		(w).WriteHeader(http.StatusBadRequest)
+		(w).Write([]byte("TestAndSet: Value Required\n"))
+		return
+	}
+
+	if len(command.PrevValue) == 0 {
+		(w).WriteHeader(http.StatusBadRequest)
+		(w).Write([]byte("TestAndSet: PrevValue Required\n"))
+		return
+	}
+
 	strDuration := req.FormValue("ttl")
 
 	var err error
@@ -97,22 +118,31 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
 	if raftServer.State() == "leader" {
 		if body, err := raftServer.Do(c); err != nil {
-			warn("Commit failed %v", err)
-			(*w).WriteHeader(http.StatusInternalServerError)
-			return
-		} else {
-			(*w).WriteHeader(http.StatusOK)
+			if _, ok := err.(store.NotFoundError); ok {
+				http.NotFound((*w), req)
+				return
+			}
 
-			if body == nil {
+			if _, ok := err.(store.TestFail); ok {
+				(*w).WriteHeader(http.StatusBadRequest)
+				(*w).Write([]byte(err.Error() + "\n"))
 				return
 			}
+			(*w).WriteHeader(http.StatusInternalServerError)
+			return
+		} else {
 
 			body, ok := body.([]byte)
 			if !ok {
 				panic("wrong type")
 			}
 
-			(*w).Write(body)
+			if body == nil {
+				http.NotFound((*w), req)
+			} else {
+				(*w).WriteHeader(http.StatusOK)
+				(*w).Write(body)
+			}
 			return
 		}
 	} else {
@@ -174,18 +204,23 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	command.Key = key
 
 	if body, err := command.Apply(raftServer); err != nil {
-		warn("raftd: Unable to write file: %v", err)
+
+		if _, ok := err.(store.NotFoundError); ok {
+			http.NotFound((*w), req)
+			return
+		}
+
 		(*w).WriteHeader(http.StatusInternalServerError)
 		return
 	} else {
-		(*w).WriteHeader(http.StatusOK)
-
 		body, ok := body.([]byte)
 		if !ok {
 			panic("wrong type")
 		}
 
+		(*w).WriteHeader(http.StatusOK)
 		(*w).Write(body)
+
 		return
 	}
 
@@ -201,7 +236,10 @@ func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
 	command.Prefix = prefix
 
 	if body, err := command.Apply(raftServer); err != nil {
-		warn("Unable to write file: %v", err)
+		if _, ok := err.(store.NotFoundError); ok {
+			http.NotFound(w, req)
+			return
+		}
 		w.WriteHeader(http.StatusInternalServerError)
 		return
 	} else {
@@ -238,6 +276,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
 		if err != nil {
 			w.WriteHeader(http.StatusBadRequest)
+			(w).Write([]byte("Watch From Index: Vaild Index Required\n"))
 		}
 		command.SinceIndex = sinceIndex
 

+ 2 - 2
command.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"encoding/json"
+	//"errors"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 	"time"
@@ -60,8 +61,7 @@ func (c *GetCommand) CommandName() string {
 
 // Get the value of key
 func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
-	res := etcdStore.Get(c.Key)
-	return json.Marshal(res)
+	return etcdStore.Get(c.Key)
 }
 
 // List command

+ 2 - 1
etcd.go

@@ -90,10 +90,11 @@ const (
 const (
 	ELECTIONTIMTOUT  = 200 * time.Millisecond
 	HEARTBEATTIMEOUT = 50 * time.Millisecond
+	
 	// Timeout for internal raft http connection
 	// The original timeout for http is 45 seconds 
 	// which is too long for our usage.
- 	HTTPTIMEOUT      = time.Second
+	HTTPTIMEOUT = time.Second
 )
 
 //------------------------------------------------------------------------------

+ 23 - 0
store/error.go

@@ -0,0 +1,23 @@
+package store
+
+import (
+	"fmt"
+	)
+
+type NotFoundError string
+
+func (e NotFoundError) Error() string {
+	return fmt.Sprintf("Key %s Not Found", string(e))
+}
+
+type NotFile string
+
+func (e NotFile) Error() string {
+	return fmt.Sprintf("Try to set value to a dir %s", string(e))
+}
+
+type TestFail string
+
+func (e TestFail) Error() string {
+	return fmt.Sprintf("Test %s fails", string(e))
+}

+ 79 - 34
store/store.go

@@ -64,17 +64,17 @@ type Node struct {
 type Response struct {
 	Action    string `json:"action"`
 	Key       string `json:"key"`
-	PrevValue string `json:"prevValue"`
-	Value     string `json:"value"`
+	PrevValue string `json:"prevValue,omitempty"`
+	Value     string `json:"value,omitempty"`
 
 	// If the key existed before the action, this field should be true
 	// If the key did not exist before the action, this field should be false
-	Exist bool `json:"exist"`
+	NewKey    bool `json:"newKey,omitempty"`
 
-	Expiration time.Time `json:"expiration"`
+	Expiration *time.Time `json:"expiration,omitempty"`
 
 	// Time to live in second
-	TTL int64 `json:"ttl"`
+	TTL int64 `json:"ttl,omitempty"`
 
 	// The command index of the raft machine when the command is executed
 	Index uint64 `json:"index"`
@@ -142,6 +142,14 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
 
 	isExpire := !expireTime.Equal(PERMANENT)
 
+	// base response
+	resp := Response{
+		Action: "SET", 
+		Key: key, 
+		Value: value, 
+		Index: index,
+	}
+
 	// When the slow follower receive the set command
 	// the key may be expired, we should not add the node
 	// also if the node exist, we need to delete the node
@@ -154,10 +162,9 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
 	// Update ttl
 	if isExpire {
 		TTL = int64(expireTime.Sub(time.Now()) / time.Second)
-	} else {
-		// For permanent value, we set ttl to -1
-		TTL = -1
-	}
+		resp.Expiration = &expireTime
+		resp.TTL = TTL 
+	} 
 
 	// Get the node
 	node, ok := s.Tree.get(key)
@@ -186,7 +193,7 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
 		// Update the information of the node
 		s.Tree.set(key, Node{value, expireTime, node.update})
 
-		resp := Response{"SET", key, node.Value, value, true, expireTime, TTL, index}
+		resp.PrevValue = node.Value
 
 		s.watcher.notify(resp)
 
@@ -207,13 +214,18 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
 
 		update := make(chan time.Time)
 
-		s.Tree.set(key, Node{value, expireTime, update})
+		ok := s.Tree.set(key, Node{value, expireTime, update})
+
+		if !ok {
+			err := NotFile(key)
+			return nil, err
+		}
 
 		if isExpire {
 			go s.monitorExpiration(key, update, expireTime)
 		}
 
-		resp := Response{"SET", key, "", value, false, expireTime, TTL, index}
+		resp.NewKey = true
 
 		msg, err := json.Marshal(resp)
 
@@ -232,7 +244,19 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
 }
 
 // Get the value of the key
-func (s *Store) Get(key string) Response {
+func (s *Store) Get(key string) ([]byte, error) {
+	resp := s.internalGet(key)
+
+	if resp != nil {
+		return json.Marshal(resp)
+	} else {
+		err := NotFoundError(key)
+		return nil, err
+	}
+}	
+
+// Get the value of the key and return the raw response
+func (s *Store) internalGet(key string) *Response {
 
 	key = path.Clean("/" + key)
 
@@ -244,21 +268,29 @@ func (s *Store) Get(key string) Response {
 
 		isExpire = !node.ExpireTime.Equal(PERMANENT)
 
+		resp := &Response{
+			Action: "GET", 
+			Key: key, 
+			Value: node.Value, 
+			Index: s.Index,
+		}
+
 		// Update ttl
 		if isExpire {
 			TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
-		} else {
-			TTL = -1
-		}
+			resp.Expiration = &node.ExpireTime 
+			resp.TTL = TTL 
+		} 
 
-		return Response{"GET", key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index}
+		return resp
 
 	} else {
 		// we do not found the key
-		return Response{"GET", key, "", "", false, time.Unix(0, 0), 0, s.Index}
+		return nil
 	}
 }
 
+
 // List all the item in the prefix
 func (s *Store) List(prefix string) ([]byte, error) {
 
@@ -273,7 +305,8 @@ func (s *Store) List(prefix string) ([]byte, error) {
 		}
 	}
 
-	return json.Marshal(ln)
+	err := NotFoundError(prefix)
+	return nil, err
 }
 
 // Delete the key
@@ -288,20 +321,25 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) {
 
 	if ok {
 
+		resp := Response{
+			Action: "DELETE", 
+			Key: key, 
+			PrevValue: node.Value,   
+			Index: index,
+		}
+
 		if node.ExpireTime.Equal(PERMANENT) {
 
 			s.Tree.delete(key)
 
 		} else {
-
+			resp.Expiration = &node.ExpireTime
 			// Kill the expire go routine
 			node.update <- PERMANENT
 			s.Tree.delete(key)
 
 		}
 
-		resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, index}
-
 		msg, err := json.Marshal(resp)
 
 		s.watcher.notify(resp)
@@ -317,28 +355,29 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) {
 		return msg, err
 
 	} else {
-
-		resp := Response{"DELETE", key, "", "", false, time.Unix(0, 0), 0, index}
-
-		s.addToResponseMap(index, &resp)
-
-		return json.Marshal(resp)
+		err := NotFoundError(key)
+		return nil, err
 	}
 }
 
 // Set the value of the key to the value if the given prevValue is equal to the value of the key
 func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
-	resp := s.Get(key)
+	resp := s.internalGet(key)
+
+	if resp == nil {
+		err := NotFoundError(key)
+		return nil, err 
+	}
 
-	if resp.PrevValue == prevValue {
+	if resp.Value == prevValue {
 
 		// If test success, do set
 		return s.Set(key, value, expireTime, index)
 	} else {
 
-		// If fails, return the result of get which contains the current
-		// status of the key-value pair
-		return json.Marshal(resp)
+		// If fails, return err
+		err := TestFail(fmt.Sprintf("%s==%s", resp.Value, prevValue))
+		return nil, err
 	}
 
 }
@@ -371,7 +410,13 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime
 
 				s.Tree.delete(key)
 
-				resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, s.Index}
+				resp := Response{
+					Action: "DELETE", 
+					Key: key, 
+					PrevValue: node.Value, 
+					Expiration: &node.ExpireTime,  
+					Index: s.Index,
+				}
 
 				msg, err := json.Marshal(resp)