Browse Source

(feat) v1 apt backward support

Xiang Li 12 years ago
parent
commit
baaaf24f70
14 changed files with 464 additions and 124 deletions
  1. 10 10
      command.go
  2. 9 2
      error/error.go
  3. 248 0
      etcd_handler_v1.go
  4. 22 11
      etcd_handlers.go
  5. 3 3
      etcd_test.go
  6. 7 8
      store/event.go
  7. 16 24
      store/node.go
  8. 26 0
      store/response_v1.go
  9. 2 2
      store/stats_test.go
  10. 62 23
      store/store.go
  11. 32 31
      store/store_test.go
  12. 3 1
      store/watcher.go
  13. 23 8
      util.go
  14. 1 1
      version.go

+ 10 - 10
command.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"encoding/binary"
-	"encoding/json"
 	"fmt"
 	"os"
 	"path"
@@ -31,6 +30,7 @@ type CreateCommand struct {
 	Value             string    `json:"value"`
 	ExpireTime        time.Time `json:"expireTime"`
 	IncrementalSuffix bool      `json:"incrementalSuffix"`
+	Force             bool      `json:"force"`
 }
 
 // The name of the create command in the log
@@ -40,14 +40,14 @@ func (c *CreateCommand) CommandName() string {
 
 // Create node
 func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) {
-	e, err := etcdStore.Create(c.Key, c.Value, c.IncrementalSuffix, c.ExpireTime, server.CommitIndex(), server.Term())
+	e, err := etcdStore.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term())
 
 	if err != nil {
 		debug(err)
 		return nil, err
 	}
 
-	return json.Marshal(e)
+	return e, nil
 }
 
 // Update command
@@ -71,7 +71,7 @@ func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) {
 		return nil, err
 	}
 
-	return json.Marshal(e)
+	return e, nil
 }
 
 // TestAndSet command
@@ -98,7 +98,7 @@ func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
 		return nil, err
 	}
 
-	return json.Marshal(e)
+	return e, nil
 }
 
 // Get command
@@ -122,7 +122,7 @@ func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
 		return nil, err
 	}
 
-	return json.Marshal(e)
+	return e, nil
 }
 
 // Delete command
@@ -145,7 +145,7 @@ func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
 		return nil, err
 	}
 
-	return json.Marshal(e)
+	return e, nil
 }
 
 // Watch command
@@ -169,7 +169,7 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
 
 	e := <-eventChan
 
-	return json.Marshal(e)
+	return e, nil
 }
 
 // JoinCommand
@@ -211,7 +211,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	num := machineNum()
 	if num == maxClusterSize {
 		debug("Reject join request from ", c.Name)
-		return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "")
+		return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", raftServer.CommitIndex(), raftServer.Term())
 	}
 
 	addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)
@@ -222,7 +222,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	// add machine in etcd storage
 	key := path.Join("_etcd/machines", c.Name)
 	value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
-	etcdStore.Create(key, value, false, store.Permanent, raftServer.CommitIndex(), raftServer.Term())
+	etcdStore.Create(key, value, false, false, store.Permanent, raftServer.CommitIndex(), raftServer.Term())
 
 	// add peer stats
 	if c.Name != r.Name() {

+ 9 - 2
error/error.go

@@ -2,6 +2,7 @@ package error
 
 import (
 	"encoding/json"
+	"fmt"
 	"net/http"
 )
 
@@ -62,13 +63,17 @@ type Error struct {
 	ErrorCode int    `json:"errorCode"`
 	Message   string `json:"message"`
 	Cause     string `json:"cause,omitempty"`
+	Index     uint64 `json:"index"`
+	Term      uint64 `json:"term"`
 }
 
-func NewError(errorCode int, cause string) Error {
-	return Error{
+func NewError(errorCode int, cause string, index uint64, term uint64) *Error {
+	return &Error{
 		ErrorCode: errorCode,
 		Message:   errors[errorCode],
 		Cause:     cause,
+		Index:     index,
+		Term:      term,
 	}
 }
 
@@ -87,6 +92,8 @@ func (e Error) toJsonString() string {
 }
 
 func (e Error) Write(w http.ResponseWriter) {
+	w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index))
+	w.Header().Add("X-Etcd-Term", fmt.Sprint(e.Term))
 	// 3xx is reft internal error
 	if e.ErrorCode/100 == 3 {
 		http.Error(w, e.toJsonString(), http.StatusInternalServerError)

+ 248 - 0
etcd_handler_v1.go

@@ -0,0 +1,248 @@
+package main
+
+import (
+	"encoding/json"
+	"net/http"
+	"strconv"
+
+	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
+)
+
+//-------------------------------------------------------------------
+// Handlers to handle etcd-store related request via etcd url
+//-------------------------------------------------------------------
+// Multiplex GET/POST/DELETE request to corresponding handlers
+func MultiplexerV1(w http.ResponseWriter, req *http.Request) error {
+
+	switch req.Method {
+	case "GET":
+		return GetHttpHandlerV1(w, req)
+	case "POST":
+		return SetHttpHandlerV1(w, req)
+	case "PUT":
+		return SetHttpHandlerV1(w, req)
+	case "DELETE":
+		return DeleteHttpHandlerV1(w, req)
+	default:
+		w.WriteHeader(http.StatusMethodNotAllowed)
+		return nil
+	}
+}
+
+//--------------------------------------
+// State sensitive handlers
+// Set/Delete will dispatch to leader
+//--------------------------------------
+
+// Set Command Handler
+func SetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
+	key := req.URL.Path[len("/v1/keys/"):]
+
+	debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
+
+	req.ParseForm()
+
+	value := req.Form.Get("value")
+
+	if len(value) == 0 {
+		return etcdErr.NewError(200, "Set", store.UndefIndex, store.UndefTerm)
+	}
+
+	strDuration := req.Form.Get("ttl")
+
+	expireTime, err := durationToExpireTime(strDuration)
+
+	if err != nil {
+		return etcdErr.NewError(202, "Set", store.UndefIndex, store.UndefTerm)
+	}
+
+	if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 {
+		command := &TestAndSetCommand{
+			Key:        key,
+			Value:      value,
+			PrevValue:  prevValueArr[0],
+			ExpireTime: expireTime,
+		}
+
+		return dispatchEtcdCommandV1(command, w, req)
+
+	} else {
+		command := &CreateCommand{
+			Key:        key,
+			Value:      value,
+			ExpireTime: expireTime,
+			Force:      true,
+		}
+
+		return dispatchEtcdCommandV1(command, w, req)
+	}
+}
+
+// Delete Handler
+func DeleteHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
+	key := req.URL.Path[len("/v1/keys/"):]
+
+	debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
+
+	command := &DeleteCommand{
+		Key: key,
+	}
+
+	return dispatchEtcdCommandV1(command, w, req)
+}
+
+//--------------------------------------
+// State non-sensitive handlers
+// will not dispatch to leader
+// TODO: add sensitive version for these
+// command?
+//--------------------------------------
+
+// Get Handler
+func GetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
+	key := req.URL.Path[len("/v1/keys/"):]
+
+	debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
+
+	command := &GetCommand{
+		Key: key,
+	}
+
+	if event, err := command.Apply(r.Server); err != nil {
+		return err
+	} else {
+		event, _ := event.(*store.Event)
+
+		response := eventToResponse(event)
+		bytes, _ := json.Marshal(response)
+
+		w.WriteHeader(http.StatusOK)
+
+		w.Write(bytes)
+
+		return nil
+	}
+
+}
+
+// Watch handler
+func WatchHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
+	key := req.URL.Path[len("/v1/watch/"):]
+
+	command := &WatchCommand{
+		Key: key,
+	}
+
+	if req.Method == "GET" {
+		debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
+		command.SinceIndex = 0
+
+	} else if req.Method == "POST" {
+		// watch from a specific index
+
+		debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
+		content := req.FormValue("index")
+
+		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
+		if err != nil {
+			return etcdErr.NewError(203, "Watch From Index", store.UndefIndex, store.UndefTerm)
+		}
+		command.SinceIndex = sinceIndex
+
+	} else {
+		w.WriteHeader(http.StatusMethodNotAllowed)
+		return nil
+	}
+
+	if event, err := command.Apply(r.Server); err != nil {
+		return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
+	} else {
+		event, _ := event.(*store.Event)
+
+		response := eventToResponse(event)
+		bytes, _ := json.Marshal(response)
+
+		w.WriteHeader(http.StatusOK)
+
+		w.Write(bytes)
+		return nil
+	}
+
+}
+
+// Dispatch the command to leader
+func dispatchEtcdCommandV1(c Command, w http.ResponseWriter, req *http.Request) error {
+	return dispatchV1(c, w, req, nameToEtcdURL)
+}
+
+func dispatchV1(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
+	if r.State() == raft.Leader {
+		if event, err := r.Do(c); err != nil {
+			return err
+		} else {
+			if event == nil {
+				return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
+			}
+
+			event, _ := event.(*store.Event)
+
+			response := eventToResponse(event)
+			bytes, _ := json.Marshal(response)
+
+			w.WriteHeader(http.StatusOK)
+			w.Write(bytes)
+			return nil
+
+		}
+
+	} else {
+		leader := r.Leader()
+		// current no leader
+		if leader == "" {
+			return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
+		}
+		url, _ := toURL(leader)
+
+		redirect(url, w, req)
+
+		return nil
+	}
+}
+
+func eventToResponse(event *store.Event) interface{} {
+	if !event.Dir {
+		response := &store.Response{
+			Action:     event.Action,
+			Key:        event.Key,
+			Value:      event.Value,
+			PrevValue:  event.PrevValue,
+			Index:      event.Index,
+			TTL:        event.TTL,
+			Expiration: event.Expiration,
+		}
+
+		if response.Action == store.Create || response.Action == store.Update {
+			response.Action = "set"
+			if response.PrevValue == "" {
+				response.NewKey = true
+			}
+		}
+
+		return response
+	} else {
+		responses := make([]*store.Response, len(event.KVPairs))
+
+		for i, kv := range event.KVPairs {
+			responses[i] = &store.Response{
+				Action: event.Action,
+				Key:    kv.Key,
+				Value:  kv.Value,
+				Dir:    kv.Dir,
+				Index:  event.Index,
+			}
+		}
+		return responses
+	}
+}

+ 22 - 11
etcd_handlers.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"encoding/json"
 	"fmt"
 	"net/http"
 	"strconv"
@@ -23,6 +24,11 @@ func NewEtcdMuxer() *http.ServeMux {
 	etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
 	etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler))
 	etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
+
+	etcdMux.Handle("/v1/keys/", errorHandler(MultiplexerV1))
+	etcdMux.Handle("/v1/leader", errorHandler(LeaderHttpHandler))
+	etcdMux.Handle("/v1/machines", errorHandler(MachinesHttpHandler))
+	etcdMux.Handle("/v1/stats/", errorHandler(StatsHttpHandler))
 	etcdMux.HandleFunc("/test/", TestHttpHandler)
 	return etcdMux
 }
@@ -50,8 +56,8 @@ func addCorsHeader(w http.ResponseWriter, r *http.Request) {
 func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	addCorsHeader(w, r)
 	if e := fn(w, r); e != nil {
-		if etcdErr, ok := e.(etcdErr.Error); ok {
-			debug("Return error: ", etcdErr.Error())
+		if etcdErr, ok := e.(*etcdErr.Error); ok {
+			debug("Return error: ", (*etcdErr).Error())
 			etcdErr.Write(w)
 		} else {
 			http.Error(w, e.Error(), http.StatusInternalServerError)
@@ -94,7 +100,7 @@ func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	expireTime, err := durationToExpireTime(req.FormValue("ttl"))
 
 	if err != nil {
-		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create")
+		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
 	}
 
 	command := &CreateCommand{
@@ -123,12 +129,12 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
 
 	if err != nil {
-		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update")
+		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
 	}
 
 	// update should give at least one option
 	if value == "" && expireTime.Sub(store.Permanent) == 0 {
-		return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update")
+		return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
 	}
 
 	prevValue, valueOk := req.Form["prevValue"]
@@ -152,7 +158,7 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 			// bad previous index
 			if err != nil {
-				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update")
+				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
 			}
 		} else {
 			prevIndex = 0
@@ -209,7 +215,7 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 		return nil
 	} else {
-		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "")
+		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
 	}
 }
 
@@ -246,7 +252,7 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
 			leader := r.Leader()
 			// current no leader
 			if leader == "" {
-				return etcdErr.NewError(300, "")
+				return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
 			}
 			hostname, _ := nameToEtcdURL(leader)
 			redirect(hostname, w, req)
@@ -289,7 +295,7 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 			sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
 
 			if err != nil {
-				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index")
+				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
 			}
 
 			command.SinceIndex = sinceIndex
@@ -319,9 +325,14 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 		return err
 
 	} else {
-		event, _ := event.([]byte)
+		event, _ := event.(*store.Event)
+		bytes, _ := json.Marshal(event)
+
+		w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
+		w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
 		w.WriteHeader(http.StatusOK)
-		w.Write(event)
+
+		w.Write(bytes)
 
 		return nil
 	}

+ 3 - 3
etcd_test.go

@@ -49,7 +49,7 @@ func TestSingleNode(t *testing.T) {
 
 	result, err = c.Set("foo", "bar", 100)
 
-	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 99 {
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 100 {
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -61,7 +61,7 @@ func TestSingleNode(t *testing.T) {
 	// First, we'll test we can change the value if we get it write
 	result, match, err := c.TestAndSet("foo", "bar", "foobar", 100)
 
-	if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 99 || !match {
+	if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 100 || !match {
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -223,7 +223,7 @@ func templateTestSimpleMultiNode(t *testing.T, tls bool) {
 
 	result, err = c.Set("foo", "bar", 100)
 
-	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 99 {
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 100 {
 		if err != nil {
 			t.Fatal(err)
 		}

+ 7 - 8
store/event.go

@@ -118,28 +118,24 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
 
 	if e.Index == UndefIndex {
 		e.Index = eh.LastIndex
-		duped = 1
-	}
-
-	if e.Term == UndefTerm {
 		e.Term = eh.LastTerm
 		duped = 1
 	}
 
 	eh.Queue.insert(e)
 
-	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
-
 	eh.LastIndex = e.Index
 	eh.LastTerm = e.Term
 	eh.DupCnt += duped
 
+	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
+
 	return e
 }
 
 // scan function is enumerating events from the index in history and
 // stops till the first point where the key has identified prefix
-func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
+func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) {
 	eh.rwl.RLock()
 	defer eh.rwl.RUnlock()
 
@@ -150,7 +146,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 		return nil,
 			etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
 				fmt.Sprintf("the requested history has been cleared [%v/%v]",
-					eh.StartIndex, index))
+					eh.StartIndex, index), UndefIndex, UndefTerm)
 	}
 
 	// the index should locate before the size of the queue minus the duplicate count
@@ -191,6 +187,9 @@ func (eh *EventHistory) clone() *EventHistory {
 	return &EventHistory{
 		StartIndex: eh.StartIndex,
 		Queue:      clonedQueue,
+		LastIndex:  eh.LastIndex,
+		LastTerm:   eh.LastTerm,
+		DupCnt:     eh.DupCnt,
 	}
 
 }

+ 16 - 24
store/node.go

@@ -63,7 +63,7 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node
 // Remove function remove the node.
 // If the node is a directory and recursive is true, the function will recursively remove
 // add nodes under the receiver node.
-func (n *Node) Remove(recursive bool, callback func(path string)) error {
+func (n *Node) Remove(recursive bool, callback func(path string)) *etcdErr.Error {
 	if n.status == removed { // check race between remove and expire
 		return nil
 	}
@@ -89,7 +89,7 @@ func (n *Node) Remove(recursive bool, callback func(path string)) error {
 	}
 
 	if !recursive {
-		return etcdErr.NewError(etcdErr.EcodeNotFile, "")
+		return etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm)
 	}
 
 	for _, child := range n.Children { // delete all children
@@ -114,9 +114,9 @@ func (n *Node) Remove(recursive bool, callback func(path string)) error {
 
 // Read function gets the value of the node.
 // If the receiver node is not a key-value pair, a "Not A File" error will be returned.
-func (n *Node) Read() (string, error) {
+func (n *Node) Read() (string, *etcdErr.Error) {
 	if n.IsDir() {
-		return "", etcdErr.NewError(etcdErr.EcodeNotFile, "")
+		return "", etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm)
 	}
 
 	return n.Value, nil
@@ -124,9 +124,9 @@ func (n *Node) Read() (string, error) {
 
 // Write function set the value of the node to the given value.
 // If the receiver node is a directory, a "Not A File" error will be returned.
-func (n *Node) Write(value string, index uint64, term uint64) error {
+func (n *Node) Write(value string, index uint64, term uint64) *etcdErr.Error {
 	if n.IsDir() {
-		return etcdErr.NewError(etcdErr.EcodeNotFile, "")
+		return etcdErr.NewError(etcdErr.EcodeNotFile, "", UndefIndex, UndefTerm)
 	}
 
 	n.Value = value
@@ -138,9 +138,9 @@ func (n *Node) Write(value string, index uint64, term uint64) error {
 
 // List function return a slice of nodes under the receiver node.
 // If the receiver node is not a directory, a "Not A Directory" error will be returned.
-func (n *Node) List() ([]*Node, error) {
+func (n *Node) List() ([]*Node, *etcdErr.Error) {
 	if !n.IsDir() {
-		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, "")
+		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, "", UndefIndex, UndefTerm)
 	}
 
 	nodes := make([]*Node, len(n.Children))
@@ -154,25 +154,17 @@ func (n *Node) List() ([]*Node, error) {
 	return nodes, nil
 }
 
-// GetFile function returns the file node under the directory node.
+// GetChild function returns the child node under the directory node.
 // On success, it returns the file node
-// If the node that calls this function is not a directory, it returns
-// Not Directory Error
-// If the node corresponding to the name string is not file, it returns
-// Not File Error
-func (n *Node) GetFile(name string) (*Node, error) {
+func (n *Node) GetChild(name string) (*Node, *etcdErr.Error) {
 	if !n.IsDir() {
-		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, n.Path)
+		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, n.Path, UndefIndex, UndefTerm)
 	}
 
-	f, ok := n.Children[name]
+	child, ok := n.Children[name]
 
 	if ok {
-		if !f.IsDir() {
-			return f, nil
-		} else {
-			return nil, etcdErr.NewError(etcdErr.EcodeNotFile, f.Path)
-		}
+		return child, nil
 	}
 
 	return nil, nil
@@ -182,9 +174,9 @@ func (n *Node) GetFile(name string) (*Node, error) {
 // If the receiver is not a directory, a "Not A Directory" error will be returned.
 // If there is a existing node with the same name under the directory, a "Already Exist"
 // error will be returned
-func (n *Node) Add(child *Node) error {
+func (n *Node) Add(child *Node) *etcdErr.Error {
 	if !n.IsDir() {
-		return etcdErr.NewError(etcdErr.EcodeNotDir, "")
+		return etcdErr.NewError(etcdErr.EcodeNotDir, "", UndefIndex, UndefTerm)
 	}
 
 	_, name := path.Split(child.Path)
@@ -192,7 +184,7 @@ func (n *Node) Add(child *Node) error {
 	_, ok := n.Children[name]
 
 	if ok {
-		return etcdErr.NewError(etcdErr.EcodeNodeExist, "")
+		return etcdErr.NewError(etcdErr.EcodeNodeExist, "", UndefIndex, UndefTerm)
 	}
 
 	n.Children[name] = child

+ 26 - 0
store/response_v1.go

@@ -0,0 +1,26 @@
+package store
+
+import (
+	"time"
+)
+
+// The response from the store to the user who issue a command
+type Response struct {
+	Action    string `json:"action"`
+	Key       string `json:"key"`
+	Dir       bool   `json:"dir,omitempty"`
+	PrevValue string `json:"prevValue,omitempty"`
+	Value     string `json:"value,omitempty"`
+
+	// If the key did not exist before the action,
+	// this field should be set to true
+	NewKey bool `json:"newKey,omitempty"`
+
+	Expiration *time.Time `json:"expiration,omitempty"`
+
+	// Time to live in second
+	TTL int64 `json:"ttl,omitempty"`
+
+	// The command index of the raft machine when the command is executed
+	Index uint64 `json:"index"`
+}

+ 2 - 2
store/stats_test.go

@@ -16,7 +16,7 @@ func TestBasicStats(t *testing.T) {
 
 	for _, k := range keys {
 		i++
-		_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
+		_, err := s.Create(k, "bar", false, false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
 		if err != nil {
 			SetFail++
 		} else {
@@ -146,7 +146,7 @@ func TestBasicStats(t *testing.T) {
 
 	for _, k := range keys {
 		i++
-		_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*3), i, 1)
+		_, err := s.Create(k, "bar", false, false, time.Now().Add(time.Second*3), i, 1)
 		if err != nil {
 			SetFail++
 		} else {

+ 62 - 23
store/store.go

@@ -95,12 +95,13 @@ func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term
 // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
 // If the node has already existed, create will fail.
 // If any node on the path is a file, create will fail.
-func (s *Store) Create(nodePath string, value string, incrementalSuffix bool,
+func (s *Store) Create(nodePath string, value string, incrementalSuffix bool, force bool,
 	expireTime time.Time, index uint64, term uint64) (*Event, error) {
+	nodePath = path.Clean(path.Join("/", nodePath))
 
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
-	return s.internalCreate(nodePath, value, incrementalSuffix, expireTime, index, term, Create)
+	return s.internalCreate(nodePath, value, incrementalSuffix, force, expireTime, index, term, Create)
 }
 
 // Update function updates the value/ttl of the node.
@@ -109,6 +110,7 @@ func (s *Store) Create(nodePath string, value string, incrementalSuffix bool,
 func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
+	nodePath = path.Clean(path.Join("/", nodePath))
 
 	n, err := s.internalGet(nodePath, index, term)
 
@@ -124,7 +126,8 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 		if len(value) != 0 {
 			s.Stats.Inc(UpdateFail)
 
-			return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
+			err := etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
+			return nil, err
 		}
 
 	} else { // if the node is a file, we can update value and ttl
@@ -154,11 +157,13 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
 	value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
 
+	nodePath = path.Clean(path.Join("/", nodePath))
+
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 
 	if prevValue == "" && prevIndex == 0 { // try create
-		return s.internalCreate(nodePath, value, false, expireTime, index, term, TestAndSet)
+		return s.internalCreate(nodePath, value, false, false, expireTime, index, term, TestAndSet)
 	}
 
 	n, err := s.internalGet(nodePath, index, term)
@@ -170,7 +175,7 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
 
 	if n.IsDir() { // can only test and set file
 		s.Stats.Inc(TestAndSetFail)
-		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
+		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
 	}
 
 	if n.Value == prevValue || n.ModifiedIndex == prevIndex {
@@ -182,6 +187,11 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
 
 		n.UpdateTTL(expireTime, s)
 
+		if n.ExpireTime.Sub(Permanent) != 0 {
+			e.Expiration = &n.ExpireTime
+			e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
+		}
+
 		s.WatcherHub.notify(e)
 		s.Stats.Inc(TestAndSetSuccess)
 		return e, nil
@@ -189,12 +199,14 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
 
 	cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
 	s.Stats.Inc(TestAndSetFail)
-	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause)
+	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, index, term)
 }
 
 // Delete function deletes the node at the given path.
 // If the node is a directory, recursive must be true to delete it.
 func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
+	nodePath = path.Clean(path.Join("/", nodePath))
+
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 
@@ -231,25 +243,38 @@ func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint6
 }
 
 func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
+	prefix = path.Clean(path.Join("/", prefix))
+
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
 
 	s.Index, s.Term = index, term
 
+	var c <-chan *Event
+	var err *etcdErr.Error
+
 	if sinceIndex == 0 {
-		return s.WatcherHub.watch(prefix, recursive, index+1)
+		c, err = s.WatcherHub.watch(prefix, recursive, index+1)
+
+	} else {
+		c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex)
 	}
 
-	return s.WatcherHub.watch(prefix, recursive, sinceIndex)
+	if err != nil {
+		err.Index = index
+		err.Term = term
+	}
+
+	return c, err
 }
 
 // walk function walks all the nodePath and apply the walkFunc on each directory
-func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) {
+func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) {
 	components := strings.Split(nodePath, "/")
 
 	curr := s.Root
+	var err *etcdErr.Error
 
-	var err error
 	for i := 1; i < len(components); i++ {
 		if len(components[i]) == 0 { // ignore empty string
 			return curr, nil
@@ -265,7 +290,9 @@ func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string
 	return curr, nil
 }
 
-func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix bool, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
+func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool,
+	expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
+
 	if incrementalSuffix { // append unique incremental suffix to the node path
 		nodePath += "_" + strconv.FormatUint(index, 10)
 	}
@@ -275,30 +302,42 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix
 	// make sure we can create the node
 	_, err := s.internalGet(nodePath, index, term)
 
-	if err == nil { // key already exists
+	if err == nil && !force { // key already exists
 		s.Stats.Inc(SetFail)
-		return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath)
+		return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, index, term)
 	}
 
-	etcdError, _ := err.(etcdErr.Error)
-
-	if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking
+	if err != nil && err.ErrorCode == 104 { // we cannot create the key due to meet a file while walking
 		s.Stats.Inc(SetFail)
 		return nil, err
 	}
 
-	dir, _ := path.Split(nodePath)
+	dir, newNodeName := path.Split(nodePath)
 
 	// walk through the nodePath, create dirs and get the last directory node
 	d, err := s.walk(dir, s.checkDir)
 
 	if err != nil {
 		s.Stats.Inc(SetFail)
+		fmt.Println("1: bad create")
 		return nil, err
 	}
 
 	e := newEvent(action, nodePath, s.Index, s.Term)
 
+	if force { // force will try to replace a existing file
+		n, _ := d.GetChild(newNodeName)
+		if n != nil {
+			if n.IsDir() {
+				return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
+			}
+			e.PrevValue, _ = n.Read()
+
+			n.Remove(false, nil)
+
+		}
+	}
+
 	var n *Node
 
 	if len(value) != 0 { // create file
@@ -333,16 +372,17 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix
 }
 
 // InternalGet function get the node of the given nodePath.
-func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node, error) {
+func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) {
 	nodePath = path.Clean(path.Join("/", nodePath))
 
 	// update file system known index and term
 	s.Index, s.Term = index, term
 
-	walkFunc := func(parent *Node, name string) (*Node, error) {
+	walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
 
 		if !parent.IsDir() {
-			return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path)
+			err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, index, term)
+			return nil, err
 		}
 
 		child, ok := parent.Children[name]
@@ -350,7 +390,7 @@ func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node,
 			return child, nil
 		}
 
-		return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name))
+		return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), index, term)
 	}
 
 	f, err := s.walk(nodePath, walkFunc)
@@ -358,7 +398,6 @@ func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node,
 	if err != nil {
 		return nil, err
 	}
-
 	return f, nil
 }
 
@@ -366,7 +405,7 @@ func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node,
 // If it is a directory, this function will return the pointer to that node.
 // If it does not exist, this function will create a new directory and return the pointer to that node.
 // If it is a file, this function will return error.
-func (s *Store) checkDir(parent *Node, dirName string) (*Node, error) {
+func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
 	subDir, ok := parent.Children[dirName]
 
 	if ok {

+ 32 - 31
store/store_test.go

@@ -10,10 +10,10 @@ import (
 func TestCreateAndGet(t *testing.T) {
 	s := New()
 
-	s.Create("/foobar", "bar", false, Permanent, 1, 1)
+	s.Create("/foobar", "bar", false, false, Permanent, 1, 1)
 
 	// already exist, create should fail
-	_, err := s.Create("/foobar", "bar", false, Permanent, 1, 1)
+	_, err := s.Create("/foobar", "bar", false, false, Permanent, 1, 1)
 
 	if err == nil {
 		t.Fatal("Create should fail")
@@ -27,14 +27,14 @@ func TestCreateAndGet(t *testing.T) {
 	createAndGet(s, "/foo/foo/bar", t)
 
 	// meet file, create should fail
-	_, err = s.Create("/foo/bar/bar", "bar", false, Permanent, 2, 1)
+	_, err = s.Create("/foo/bar/bar", "bar", false, false, Permanent, 2, 1)
 
 	if err == nil {
 		t.Fatal("Create should fail")
 	}
 
 	// create a directory
-	_, err = s.Create("/fooDir", "", false, Permanent, 3, 1)
+	_, err = s.Create("/fooDir", "", false, false, Permanent, 3, 1)
 
 	if err != nil {
 		t.Fatal("Cannot create /fooDir")
@@ -47,7 +47,7 @@ func TestCreateAndGet(t *testing.T) {
 	}
 
 	// create a file under directory
-	_, err = s.Create("/fooDir/bar", "bar", false, Permanent, 4, 1)
+	_, err = s.Create("/fooDir/bar", "bar", false, false, Permanent, 4, 1)
 
 	if err != nil {
 		t.Fatal("Cannot create /fooDir/bar = bar")
@@ -57,7 +57,7 @@ func TestCreateAndGet(t *testing.T) {
 func TestUpdateFile(t *testing.T) {
 	s := New()
 
-	_, err := s.Create("/foo/bar", "bar", false, Permanent, 1, 1)
+	_, err := s.Create("/foo/bar", "bar", false, false, Permanent, 1, 1)
 
 	if err != nil {
 		t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error())
@@ -80,24 +80,24 @@ func TestUpdateFile(t *testing.T) {
 	}
 
 	// create a directory, update its ttl, to see if it will be deleted
-	_, err = s.Create("/foo/foo", "", false, Permanent, 3, 1)
+	_, err = s.Create("/foo/foo", "", false, false, Permanent, 3, 1)
 
 	if err != nil {
 		t.Fatalf("cannot create dir [%s] [%s]", "/foo/foo", err.Error())
 	}
 
-	_, err = s.Create("/foo/foo/foo1", "bar1", false, Permanent, 4, 1)
+	_, err = s.Create("/foo/foo/foo1", "bar1", false, false, Permanent, 4, 1)
 
 	if err != nil {
 		t.Fatal("cannot create [%s]", err.Error())
 	}
 
-	_, err = s.Create("/foo/foo/foo2", "", false, Permanent, 5, 1)
+	_, err = s.Create("/foo/foo/foo2", "", false, false, Permanent, 5, 1)
 	if err != nil {
 		t.Fatal("cannot create [%s]", err.Error())
 	}
 
-	_, err = s.Create("/foo/foo/foo2/boo", "boo1", false, Permanent, 6, 1)
+	_, err = s.Create("/foo/foo/foo2/boo", "boo1", false, false, Permanent, 6, 1)
 	if err != nil {
 		t.Fatal("cannot create [%s]", err.Error())
 	}
@@ -158,11 +158,11 @@ func TestListDirectory(t *testing.T) {
 
 	// create dir /foo
 	// set key-value /foo/foo=bar
-	s.Create("/foo/foo", "bar", false, Permanent, 1, 1)
+	s.Create("/foo/foo", "bar", false, false, Permanent, 1, 1)
 
 	// create dir /foo/fooDir
 	// set key-value /foo/fooDir/foo=bar
-	s.Create("/foo/fooDir/foo", "bar", false, Permanent, 2, 1)
+	s.Create("/foo/fooDir/foo", "bar", false, false, Permanent, 2, 1)
 
 	e, err := s.Get("/foo", true, false, 2, 1)
 
@@ -189,7 +189,7 @@ func TestListDirectory(t *testing.T) {
 
 	// create dir /foo/_hidden
 	// set key-value /foo/_hidden/foo -> bar
-	s.Create("/foo/_hidden/foo", "bar", false, Permanent, 3, 1)
+	s.Create("/foo/_hidden/foo", "bar", false, false, Permanent, 3, 1)
 
 	e, _ = s.Get("/foo", false, false, 2, 1)
 
@@ -201,7 +201,7 @@ func TestListDirectory(t *testing.T) {
 func TestRemove(t *testing.T) {
 	s := New()
 
-	s.Create("/foo", "bar", false, Permanent, 1, 1)
+	s.Create("/foo", "bar", false, false, Permanent, 1, 1)
 	_, err := s.Delete("/foo", false, 1, 1)
 
 	if err != nil {
@@ -214,9 +214,9 @@ func TestRemove(t *testing.T) {
 		t.Fatalf("can get the node after deletion")
 	}
 
-	s.Create("/foo/bar", "bar", false, Permanent, 1, 1)
-	s.Create("/foo/car", "car", false, Permanent, 1, 1)
-	s.Create("/foo/dar/dar", "dar", false, Permanent, 1, 1)
+	s.Create("/foo/bar", "bar", false, false, Permanent, 1, 1)
+	s.Create("/foo/car", "car", false, false, Permanent, 1, 1)
+	s.Create("/foo/dar/dar", "dar", false, false, Permanent, 1, 1)
 
 	_, err = s.Delete("/foo", false, 1, 1)
 
@@ -242,7 +242,7 @@ func TestExpire(t *testing.T) {
 
 	expire := time.Now().Add(time.Second)
 
-	s.Create("/foo", "bar", false, expire, 1, 1)
+	s.Create("/foo", "bar", false, false, expire, 1, 1)
 
 	_, err := s.Get("/foo", false, false, 1, 1)
 
@@ -260,7 +260,7 @@ func TestExpire(t *testing.T) {
 
 	// test if we can reach the node before expiration
 	expire = time.Now().Add(time.Second)
-	s.Create("/foo", "bar", false, expire, 1, 1)
+	s.Create("/foo", "bar", false, false, expire, 1, 1)
 
 	time.Sleep(time.Millisecond * 50)
 	_, err = s.Get("/foo", false, false, 1, 1)
@@ -271,7 +271,7 @@ func TestExpire(t *testing.T) {
 
 	expire = time.Now().Add(time.Second)
 
-	s.Create("/foo", "bar", false, expire, 1, 1)
+	s.Create("/foo", "bar", false, false, expire, 1, 1)
 	_, err = s.Delete("/foo", false, 1, 1)
 
 	if err != nil {
@@ -281,7 +281,7 @@ func TestExpire(t *testing.T) {
 
 func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
 	s := New()
-	s.Create("/foo", "bar", false, Permanent, 1, 1)
+	s.Create("/foo", "bar", false, false, Permanent, 1, 1)
 
 	// test on wrong previous value
 	_, err := s.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1)
@@ -316,7 +316,7 @@ func TestWatch(t *testing.T) {
 	s := New()
 	// watch at a deeper path
 	c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1)
-	s.Create("/foo/foo/foo", "bar", false, Permanent, 1, 1)
+	s.Create("/foo/foo/foo", "bar", false, false, Permanent, 1, 1)
 
 	e := nonblockingRetrive(c)
 	if e.Key != "/foo/foo/foo" || e.Action != Create {
@@ -346,7 +346,7 @@ func TestWatch(t *testing.T) {
 
 	// watch at a prefix
 	c, _ = s.Watch("/foo", true, 0, 4, 1)
-	s.Create("/foo/foo/boo", "bar", false, Permanent, 5, 1)
+	s.Create("/foo/foo/boo", "bar", false, false, Permanent, 5, 1)
 	e = nonblockingRetrive(c)
 	if e.Key != "/foo/foo/boo" || e.Action != Create {
 		t.Fatal("watch for Create subdirectory fails")
@@ -374,7 +374,7 @@ func TestWatch(t *testing.T) {
 	}
 
 	// watch expire
-	s.Create("/foo/foo/boo", "foo", false, time.Now().Add(time.Second*1), 9, 1)
+	s.Create("/foo/foo/boo", "foo", false, false, time.Now().Add(time.Second*1), 9, 1)
 	c, _ = s.Watch("/foo", true, 0, 9, 1)
 	time.Sleep(time.Second * 2)
 	e = nonblockingRetrive(c)
@@ -382,7 +382,7 @@ func TestWatch(t *testing.T) {
 		t.Fatal("watch for Expiration of Create() subdirectory fails ", e)
 	}
 
-	s.Create("/foo/foo/boo", "foo", false, Permanent, 10, 1)
+	s.Create("/foo/foo/boo", "foo", false, false, Permanent, 10, 1)
 	s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1)
 	c, _ = s.Watch("/foo", true, 0, 11, 1)
 	time.Sleep(time.Second * 2)
@@ -391,7 +391,7 @@ func TestWatch(t *testing.T) {
 		t.Fatal("watch for Expiration of Update() subdirectory fails ", e)
 	}
 
-	s.Create("/foo/foo/boo", "foo", false, Permanent, 12, 1)
+	s.Create("/foo/foo/boo", "foo", false, false, Permanent, 12, 1)
 	s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1)
 	c, _ = s.Watch("/foo", true, 0, 13, 1)
 	time.Sleep(time.Second * 2)
@@ -409,7 +409,7 @@ func TestSort(t *testing.T) {
 
 	i := uint64(1)
 	for _, k := range keys {
-		_, err := s.Create(k, "bar", false, Permanent, i, 1)
+		_, err := s.Create(k, "bar", false, false, Permanent, i, 1)
 		if err != nil {
 			panic(err)
 		} else {
@@ -447,7 +447,7 @@ func TestSaveAndRecover(t *testing.T) {
 
 	i := uint64(1)
 	for _, k := range keys {
-		_, err := s.Create(k, "bar", false, Permanent, i, 1)
+		_, err := s.Create(k, "bar", false, false, Permanent, i, 1)
 		if err != nil {
 			panic(err)
 		} else {
@@ -459,7 +459,7 @@ func TestSaveAndRecover(t *testing.T) {
 	// test if we can reach the node before expiration
 
 	expire := time.Now().Add(time.Second)
-	s.Create("/foo/foo", "bar", false, expire, 1, 1)
+	s.Create("/foo/foo", "bar", false, false, expire, 1, 1)
 	b, err := s.Save()
 
 	cloneFs := New()
@@ -479,7 +479,8 @@ func TestSaveAndRecover(t *testing.T) {
 	defer s.worldLock.RUnlock()
 
 	if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex {
-		t.Fatal("Error recovered event history start index")
+		t.Fatalf("Error recovered event history start index[%v/%v]",
+			s.WatcherHub.EventHistory.StartIndex, cloneFs.WatcherHub.EventHistory.StartIndex)
 	}
 
 	for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ {
@@ -514,7 +515,7 @@ func GenKeys(num int, depth int) []string {
 }
 
 func createAndGet(s *Store, path string, t *testing.T) {
-	_, err := s.Create(path, "bar", false, Permanent, 1, 1)
+	_, err := s.Create(path, "bar", false, false, Permanent, 1, 1)
 
 	if err != nil {
 		t.Fatalf("cannot create %s=bar [%s]", path, err.Error())

+ 3 - 1
store/watcher.go

@@ -5,6 +5,8 @@ import (
 	"path"
 	"strings"
 	"sync/atomic"
+
+	etcdErr "github.com/coreos/etcd/error"
 )
 
 type watcherHub struct {
@@ -30,7 +32,7 @@ func newWatchHub(capacity int) *watcherHub {
 // If recursive is true, the first change after index under prefix will be sent to the event channel.
 // If recursive is false, the first change after index at prefix will be sent to the event channel.
 // If index is zero, watch will start from the current index + 1.
-func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, error) {
+func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
 	eventChan := make(chan *Event, 1)
 
 	e, err := wh.EventHistory.scan(prefix, index)

+ 23 - 8
util.go

@@ -69,24 +69,40 @@ func startWebInterface() {
 
 func dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
 	if r.State() == raft.Leader {
-		if body, err := r.Do(c); err != nil {
+		if response, err := r.Do(c); err != nil {
 			return err
 		} else {
-			if body == nil {
-				return etcdErr.NewError(300, "Empty result from raft")
-			} else {
-				body, _ := body.([]byte)
+			if response == nil {
+				return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm)
+			}
+
+			event, ok := response.(*store.Event)
+			if ok {
+				bytes, err := json.Marshal(event)
+				if err != nil {
+					fmt.Println(err)
+				}
+
+				w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
+				w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
 				w.WriteHeader(http.StatusOK)
-				w.Write(body)
+				w.Write(bytes)
+
 				return nil
 			}
+
+			bytes, _ := response.([]byte)
+			w.WriteHeader(http.StatusOK)
+			w.Write(bytes)
+
+			return nil
 		}
 
 	} else {
 		leader := r.Leader()
 		// current no leader
 		if leader == "" {
-			return etcdErr.NewError(300, "")
+			return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
 		}
 		url, _ := toURL(leader)
 
@@ -94,7 +110,6 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(na
 
 		return nil
 	}
-	return etcdErr.NewError(300, "")
 }
 
 func redirect(hostname string, w http.ResponseWriter, req *http.Request) {

+ 1 - 1
version.go

@@ -1,6 +1,6 @@
 package main
 
-const version = "v1"
+const version = "v2"
 
 // TODO: The release version (generated from the git tag) will be the raft
 // protocol version for now. When things settle down we will fix it like the