Browse Source

Merge pull request #376 from xiangli-cmu/dir_flag

feat add dir_flag
Xiang Li 12 years ago
parent
commit
dd354c9e22

+ 7 - 5
error/error.go

@@ -33,6 +33,7 @@ const (
 	EcodeNodeExist      = 105
 	EcodeKeyIsPreserved = 106
 	EcodeRootROnly      = 107
+	EcodeDirNotEmpty    = 108
 
 	EcodeValueRequired      = 200
 	EcodePrevValueRequired  = 201
@@ -51,14 +52,15 @@ func init() {
 	errors = make(map[int]string)
 
 	// command related errors
-	errors[EcodeKeyNotFound] = "Key Not Found"
-	errors[EcodeTestFailed] = "Test Failed" //test and set
-	errors[EcodeNotFile] = "Not A File"
+	errors[EcodeKeyNotFound] = "Key not found"
+	errors[EcodeTestFailed] = "Compare failed" //test and set
+	errors[EcodeNotFile] = "Not a file"
 	errors[EcodeNoMorePeer] = "Reached the max number of peers in the cluster"
-	errors[EcodeNotDir] = "Not A Directory"
-	errors[EcodeNodeExist] = "Already exists" // create
+	errors[EcodeNotDir] = "Not a directory"
+	errors[EcodeNodeExist] = "Key already exists" // create
 	errors[EcodeRootROnly] = "Root is read only"
 	errors[EcodeKeyIsPreserved] = "The prefix of given key is a keyword in etcd"
+	errors[EcodeDirNotEmpty] = "Directory not empty"
 
 	// Post form related errors
 	errors[EcodeValueRequired] = "Value is Required in POST form"

+ 2 - 2
server/registry.go

@@ -45,7 +45,7 @@ func (r *Registry) Register(name string, peerURL string, url string) error {
 	// Write data to store.
 	key := path.Join(RegistryKey, name)
 	value := fmt.Sprintf("raft=%s&etcd=%s", peerURL, url)
-	_, err := r.store.Create(key, value, false, store.Permanent)
+	_, err := r.store.Create(key, false, value, false, store.Permanent)
 	log.Debugf("Register: %s", name)
 	return err
 }
@@ -59,7 +59,7 @@ func (r *Registry) Unregister(name string) error {
 	// delete(r.nodes, name)
 
 	// Remove the key from the store.
-	_, err := r.store.Delete(path.Join(RegistryKey, name), false)
+	_, err := r.store.Delete(path.Join(RegistryKey, name), false, false)
 	log.Debugf("Unregister: %s", name)
 	return err
 }

+ 9 - 9
server/server.go

@@ -46,14 +46,14 @@ func New(name string, urlStr string, bindAddr string, tlsConf *TLSConfig, tlsInf
 			TLSConfig: &tlsConf.Server,
 			Addr:      bindAddr,
 		},
-		name:       name,
-		store:      store,
-		registry:   registry,
-		url:        urlStr,
-		tlsConf:    tlsConf,
-		tlsInfo:    tlsInfo,
-		peerServer: peerServer,
-		router:     r,
+		name:        name,
+		store:       store,
+		registry:    registry,
+		url:         urlStr,
+		tlsConf:     tlsConf,
+		tlsInfo:     tlsInfo,
+		peerServer:  peerServer,
+		router:      r,
 		corsHandler: cors,
 	}
 
@@ -377,7 +377,7 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
 	for i := 0; i < count; i++ {
 		go func() {
 			for j := 0; j < 10; j++ {
-				c := s.Store().CommandFactory().CreateSetCommand("foo", "bar", time.Unix(0, 0))
+				c := s.Store().CommandFactory().CreateSetCommand("foo", false, "bar", time.Unix(0, 0))
 				s.peerServer.RaftServer().Do(c)
 			}
 			c <- true

+ 1 - 1
server/v1/delete_key_handler.go

@@ -9,6 +9,6 @@ import (
 func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	vars := mux.Vars(req)
 	key := "/" + vars["key"]
-	c := s.Store().CommandFactory().CreateDeleteCommand(key, false)
+	c := s.Store().CommandFactory().CreateDeleteCommand(key, false, false)
 	return s.Dispatch(c, w, req)
 }

+ 2 - 2
server/v1/set_key_handler.go

@@ -36,11 +36,11 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 			c = s.Store().CommandFactory().CreateCompareAndSwapCommand(key, value, prevValueArr[0], 0, expireTime)
 		} else {
 			// test against existence
-			c = s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, false)
+			c = s.Store().CommandFactory().CreateCreateCommand(key, false, value, expireTime, false)
 		}
 
 	} else {
-		c = s.Store().CommandFactory().CreateSetCommand(key, value, expireTime)
+		c = s.Store().CommandFactory().CreateSetCommand(key, false, value, expireTime)
 	}
 
 	return s.Dispatch(c, w, req)

+ 3 - 1
server/v2/delete_handler.go

@@ -9,8 +9,10 @@ import (
 func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	vars := mux.Vars(req)
 	key := "/" + vars["key"]
+
 	recursive := (req.FormValue("recursive") == "true")
+	dir := (req.FormValue("dir") == "true")
 
-	c := s.Store().CommandFactory().CreateDeleteCommand(key, recursive)
+	c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive)
 	return s.Dispatch(c, w, req)
 }

+ 2 - 1
server/v2/post_handler.go

@@ -13,11 +13,12 @@ func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	key := "/" + vars["key"]
 
 	value := req.FormValue("value")
+	dir := (req.FormValue("dir") == "true")
 	expireTime, err := store.TTL(req.FormValue("ttl"))
 	if err != nil {
 		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index())
 	}
 
-	c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, true)
+	c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, true)
 	return s.Dispatch(c, w, req)
 }

+ 11 - 9
server/v2/put_handler.go

@@ -20,23 +20,25 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	req.ParseForm()
 
 	value := req.Form.Get("value")
+	dir := (req.FormValue("dir") == "true")
+
 	expireTime, err := store.TTL(req.Form.Get("ttl"))
 	if err != nil {
 		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store().Index())
 	}
 
 	_, valueOk := req.Form["prevValue"]
-	prevValue := req.Form.Get("prevValue")
+	prevValue := req.FormValue("prevValue")
 
 	_, indexOk := req.Form["prevIndex"]
-	prevIndexStr := req.Form.Get("prevIndex")
+	prevIndexStr := req.FormValue("prevIndex")
 
 	_, existOk := req.Form["prevExist"]
-	prevExist := req.Form.Get("prevExist")
+	prevExist := req.FormValue("prevExist")
 
 	// Set handler: create a new node or replace the old one.
 	if !valueOk && !indexOk && !existOk {
-		return SetHandler(w, req, s, key, value, expireTime)
+		return SetHandler(w, req, s, key, dir, value, expireTime)
 	}
 
 	// update with test
@@ -44,7 +46,7 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 		if prevExist == "false" {
 			// Create command: create a new node. Fail, if a node already exists
 			// Ignore prevIndex and prevValue
-			return CreateHandler(w, req, s, key, value, expireTime)
+			return CreateHandler(w, req, s, key, dir, value, expireTime)
 		}
 
 		if prevExist == "true" && !indexOk && !valueOk {
@@ -75,13 +77,13 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	return s.Dispatch(c, w, req)
 }
 
-func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
-	c := s.Store().CommandFactory().CreateSetCommand(key, value, expireTime)
+func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key string, dir bool, value string, expireTime time.Time) error {
+	c := s.Store().CommandFactory().CreateSetCommand(key, dir, value, expireTime)
 	return s.Dispatch(c, w, req)
 }
 
-func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
-	c := s.Store().CommandFactory().CreateCreateCommand(key, value, expireTime, false)
+func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key string, dir bool, value string, expireTime time.Time) error {
+	c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, false)
 	return s.Dispatch(c, w, req)
 }
 

+ 56 - 0
server/v2/tests/delete_handler_test.go

@@ -27,3 +27,59 @@ func TestV2DeleteKey(t *testing.T) {
 		assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2}}`, "")
 	})
 }
+
+// Ensures that an empty directory is deleted when dir is set.
+//
+//   $ curl -X PUT localhost:4001/v2/keys/foo?dir=true
+//   $ curl -X PUT localhost:4001/v2/keys/foo ->fail
+//   $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true
+//
+func TestV2DeleteEmptyDirectory(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
+		tests.ReadBody(resp)
+		resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), url.Values{})
+		bodyJson := tests.ReadBodyJSON(resp)
+		assert.Equal(t, bodyJson["errorCode"], 102, "")
+		resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
+		body := tests.ReadBody(resp)
+		assert.Nil(t, err, "")
+		assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
+	})
+}
+
+// Ensures that a not-empty directory is deleted when dir is set.
+//
+//   $ curl -X PUT localhost:4001/v2/keys/foo/bar?dir=true
+//   $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true ->fail
+//   $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true&recursive=true
+//
+func TestV2DeleteNonEmptyDirectory(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?dir=true"), url.Values{})
+		tests.ReadBody(resp)
+		resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
+		bodyJson := tests.ReadBodyJSON(resp)
+		assert.Equal(t, bodyJson["errorCode"], 108, "")
+		resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true&recursive=true"), url.Values{})
+		body := tests.ReadBody(resp)
+		assert.Nil(t, err, "")
+		assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
+	})
+}
+
+// Ensures that a directory is deleted when recursive is set.
+//
+//   $ curl -X PUT localhost:4001/v2/keys/foo?dir=true
+//   $ curl -X DELETE localhost:4001/v2/keys/foo?recursive=true
+//
+func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
+		tests.ReadBody(resp)
+		resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true"), url.Values{})
+		body := tests.ReadBody(resp)
+		assert.Nil(t, err, "")
+		assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2}}`, "")
+	})
+}

+ 1 - 1
server/v2/tests/post_handler_test.go

@@ -24,7 +24,7 @@ func TestV2CreateUnique(t *testing.T) {
 
 		node := body["node"].(map[string]interface{})
 		assert.Equal(t, node["key"], "/foo/bar/2", "")
-		assert.Equal(t, node["dir"], true, "")
+		assert.Nil(t, node["dir"], "")
 		assert.Equal(t, node["modifiedIndex"], 2, "")
 
 		// Second POST should add next index to list.

+ 20 - 6
server/v2/tests/put_handler_test.go

@@ -26,6 +26,19 @@ func TestV2SetKey(t *testing.T) {
 	})
 }
 
+// Ensures that a directory is created
+//
+//   $ curl -X PUT localhost:4001/v2/keys/foo/bar?dir=true
+//
+func TestV2SetDirectory(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), url.Values{})
+		body := tests.ReadBody(resp)
+		assert.Nil(t, err, "")
+		assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
+	})
+}
+
 // Ensures that a time-to-live is added to a key.
 //
 //   $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d ttl=20
@@ -95,7 +108,7 @@ func TestV2CreateKeyFail(t *testing.T) {
 		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 105, "")
-		assert.Equal(t, body["message"], "Already exists", "")
+		assert.Equal(t, body["message"], "Key already exists", "")
 		assert.Equal(t, body["cause"], "/foo/bar", "")
 	})
 }
@@ -123,19 +136,20 @@ func TestV2UpdateKeySuccess(t *testing.T) {
 
 // Ensures that a key is not conditionally set if it previously did not exist.
 //
+//   $ curl -X PUT localhost:4001/v2/keys/foo?dir=true
 //   $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=true
 //
 func TestV2UpdateKeyFailOnValue(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
-		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?dir=true"), v)
 
 		v.Set("value", "YYY")
 		v.Set("prevExist", "true")
 		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 100, "")
-		assert.Equal(t, body["message"], "Key Not Found", "")
+		assert.Equal(t, body["message"], "Key not found", "")
 		assert.Equal(t, body["cause"], "/foo/bar", "")
 	})
 }
@@ -153,7 +167,7 @@ func TestV2UpdateKeyFailOnMissingDirectory(t *testing.T) {
 		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 100, "")
-		assert.Equal(t, body["message"], "Key Not Found", "")
+		assert.Equal(t, body["message"], "Key not found", "")
 		assert.Equal(t, body["cause"], "/foo", "")
 	})
 }
@@ -196,7 +210,7 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) {
 		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 101, "")
-		assert.Equal(t, body["message"], "Test Failed", "")
+		assert.Equal(t, body["message"], "Compare failed", "")
 		assert.Equal(t, body["cause"], "[ != XXX] [10 != 2]", "")
 		assert.Equal(t, body["index"], 2, "")
 	})
@@ -257,7 +271,7 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) {
 		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 101, "")
-		assert.Equal(t, body["message"], "Test Failed", "")
+		assert.Equal(t, body["message"], "Compare failed", "")
 		assert.Equal(t, body["cause"], "[AAA != XXX] [0 != 2]", "")
 		assert.Equal(t, body["index"], 2, "")
 	})

+ 5 - 4
store/command_factory.go

@@ -16,11 +16,12 @@ var minVersion, maxVersion int
 type CommandFactory interface {
 	Version() int
 	CreateUpgradeCommand() raft.Command
-	CreateSetCommand(key string, value string, expireTime time.Time) raft.Command
-	CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command
+	CreateSetCommand(key string, dir bool, value string, expireTime time.Time) raft.Command
+	CreateCreateCommand(key string, dir bool, value string, expireTime time.Time, unique bool) raft.Command
 	CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command
-	CreateDeleteCommand(key string, recursive bool) raft.Command
-	CreateCompareAndSwapCommand(key string, value string, prevValue string, prevIndex uint64, expireTime time.Time) raft.Command
+	CreateDeleteCommand(key string, dir, recursive bool) raft.Command
+	CreateCompareAndSwapCommand(key string, value string, prevValue string,
+		prevIndex uint64, expireTime time.Time) raft.Command
 	CreateSyncCommand(now time.Time) raft.Command
 }
 

+ 13 - 5
store/node.go

@@ -175,11 +175,19 @@ func (n *node) Add(child *node) *etcdErr.Error {
 }
 
 // Remove function remove the node.
-func (n *node) Remove(recursive bool, callback func(path string)) *etcdErr.Error {
+func (n *node) Remove(dir, recursive bool, callback func(path string)) *etcdErr.Error {
 
-	if n.IsDir() && !recursive {
-		// cannot delete a directory without set recursive to true
-		return etcdErr.NewError(etcdErr.EcodeNotFile, "", n.store.Index())
+	if n.IsDir() {
+		if !dir {
+			// cannot delete a directory without recursive set to true
+			return etcdErr.NewError(etcdErr.EcodeNotFile, n.Path, n.store.Index())
+		}
+
+		if len(n.Children) != 0 && !recursive {
+			// cannot delete a directory if it is not empty and the operation
+			// is not recursive
+			return etcdErr.NewError(etcdErr.EcodeDirNotEmpty, n.Path, n.store.Index())
+		}
 	}
 
 	if !n.IsDir() { // key-value pair
@@ -202,7 +210,7 @@ func (n *node) Remove(recursive bool, callback func(path string)) *etcdErr.Error
 	}
 
 	for _, child := range n.Children { // delete all children
-		child.Remove(true, callback)
+		child.Remove(true, true, callback)
 	}
 
 	// delete self

+ 12 - 12
store/stats_test.go

@@ -10,7 +10,7 @@ import (
 // Ensure that a successful Get is recorded in the stats.
 func TestStoreStatsGetSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	s.Get("/foo", false, false)
 	assert.Equal(t, uint64(1), s.Stats.GetSuccess, "")
 }
@@ -18,7 +18,7 @@ func TestStoreStatsGetSuccess(t *testing.T) {
 // Ensure that a failed Get is recorded in the stats.
 func TestStoreStatsGetFail(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	s.Get("/no_such_key", false, false)
 	assert.Equal(t, uint64(1), s.Stats.GetFail, "")
 }
@@ -26,22 +26,22 @@ func TestStoreStatsGetFail(t *testing.T) {
 // Ensure that a successful Create is recorded in the stats.
 func TestStoreStatsCreateSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	assert.Equal(t, uint64(1), s.Stats.CreateSuccess, "")
 }
 
 // Ensure that a failed Create is recorded in the stats.
 func TestStoreStatsCreateFail(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent)
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", true, "", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	assert.Equal(t, uint64(1), s.Stats.CreateFail, "")
 }
 
 // Ensure that a successful Update is recorded in the stats.
 func TestStoreStatsUpdateSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	s.Update("/foo", "baz", Permanent)
 	assert.Equal(t, uint64(1), s.Stats.UpdateSuccess, "")
 }
@@ -56,7 +56,7 @@ func TestStoreStatsUpdateFail(t *testing.T) {
 // Ensure that a successful CAS is recorded in the stats.
 func TestStoreStatsCompareAndSwapSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	assert.Equal(t, uint64(1), s.Stats.CompareAndSwapSuccess, "")
 }
@@ -64,7 +64,7 @@ func TestStoreStatsCompareAndSwapSuccess(t *testing.T) {
 // Ensure that a failed CAS is recorded in the stats.
 func TestStoreStatsCompareAndSwapFail(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent)
 	assert.Equal(t, uint64(1), s.Stats.CompareAndSwapFail, "")
 }
@@ -72,15 +72,15 @@ func TestStoreStatsCompareAndSwapFail(t *testing.T) {
 // Ensure that a successful Delete is recorded in the stats.
 func TestStoreStatsDeleteSuccess(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
-	s.Delete("/foo", false)
+	s.Create("/foo", false, "bar", false, Permanent)
+	s.Delete("/foo", false, false)
 	assert.Equal(t, uint64(1), s.Stats.DeleteSuccess, "")
 }
 
 // Ensure that a failed Delete is recorded in the stats.
 func TestStoreStatsDeleteFail(t *testing.T) {
 	s := newStore()
-	s.Delete("/foo", false)
+	s.Delete("/foo", false, false)
 	assert.Equal(t, uint64(1), s.Stats.DeleteFail, "")
 }
 
@@ -94,7 +94,7 @@ func TestStoreStatsExpireCount(t *testing.T) {
 	}()
 
 	go mockSyncService(s.DeleteExpiredKeys, c)
-	s.Create("/foo", "bar", false, time.Now().Add(500*time.Millisecond))
+	s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
 	assert.Equal(t, uint64(0), s.Stats.ExpireCount, "")
 	time.Sleep(600 * time.Millisecond)
 	assert.Equal(t, uint64(1), s.Stats.ExpireCount, "")

+ 32 - 31
store/store.go

@@ -42,17 +42,20 @@ type Store interface {
 	Version() int
 	CommandFactory() CommandFactory
 	Index() uint64
+
 	Get(nodePath string, recursive, sorted bool) (*Event, error)
-	Set(nodePath string, value string, expireTime time.Time) (*Event, error)
+	Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
 	Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
-	Create(nodePath string, value string, incrementalSuffix bool,
+	Create(nodePath string, dir bool, value string, unique bool,
 		expireTime time.Time) (*Event, error)
 	CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
 		value string, expireTime time.Time) (*Event, error)
-	Delete(nodePath string, recursive bool) (*Event, error)
+	Delete(nodePath string, recursive, dir bool) (*Event, error)
 	Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
+
 	Save() ([]byte, error)
 	Recovery(state []byte) error
+
 	TotalTransactions() uint64
 	JsonStats() []byte
 	DeleteExpiredKeys(cutoff time.Time)
@@ -156,10 +159,10 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
 // Create function creates the node at nodePath. Create will help to create intermediate directories with no ttl.
 // If the node has already existed, create will fail.
 // If any node on the path is a file, create will fail.
-func (s *store) Create(nodePath string, value string, unique bool, expireTime time.Time) (*Event, error) {
+func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
-	e, err := s.internalCreate(nodePath, value, unique, false, expireTime, Create)
+	e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
 
 	if err == nil {
 		s.Stats.Inc(CreateSuccess)
@@ -171,10 +174,10 @@ func (s *store) Create(nodePath string, value string, unique bool, expireTime ti
 }
 
 // Set function creates or replace the node at nodePath.
-func (s *store) Set(nodePath string, value string, expireTime time.Time) (*Event, error) {
+func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
-	e, err := s.internalCreate(nodePath, value, false, true, expireTime, Set)
+	e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
 
 	if err == nil {
 		s.Stats.Inc(SetSuccess)
@@ -239,7 +242,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
 
 // Delete function deletes the node at the given path.
 // If the node is a directory, recursive must be true to delete it.
-func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
+func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
 	nodePath = path.Clean(path.Join("/", nodePath))
 	// we do not allow the user to change "/"
 	if nodePath == "/" {
@@ -249,6 +252,11 @@ func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 
+	// recursive implies dir
+	if recursive == true {
+		dir = true
+	}
+
 	nextIndex := s.CurrentIndex + 1
 
 	n, err := s.internalGet(nodePath)
@@ -272,7 +280,7 @@ func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
 		s.WatcherHub.notifyWatchers(e, path, true)
 	}
 
-	err = n.Remove(recursive, callback)
+	err = n.Remove(dir, recursive, callback)
 
 	if err != nil {
 		s.Stats.Inc(DeleteFail)
@@ -363,22 +371,16 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
 	e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
 	eNode := e.Node
 
-	if len(newValue) != 0 {
-		if n.IsDir() {
-			// if the node is a directory, we cannot update value
-			s.Stats.Inc(UpdateFail)
-			return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
-		}
-
-		eNode.PrevValue = n.Value
-		n.Write(newValue, nextIndex)
-		eNode.Value = newValue
-
-	} else {
-		// do not update value
-		eNode.Value = n.Value
+	if n.IsDir() && len(newValue) != 0 {
+		// if the node is a directory, we cannot update value to non-empty
+		s.Stats.Inc(UpdateFail)
+		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
 	}
 
+	eNode.PrevValue = n.Value
+	n.Write(newValue, nextIndex)
+	eNode.Value = newValue
+
 	// update ttl
 	n.UpdateTTL(expireTime)
 
@@ -393,7 +395,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
 	return e, nil
 }
 
-func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
+func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
 	expireTime time.Time, action string) (*Event, error) {
 
 	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
@@ -415,10 +417,10 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 		expireTime = Permanent
 	}
 
-	dir, newNodeName := path.Split(nodePath)
+	dirName, nodeName := path.Split(nodePath)
 
 	// walk through the nodePath, create dirs and get the last directory node
-	d, err := s.walk(dir, s.checkDir)
+	d, err := s.walk(dirName, s.checkDir)
 
 	if err != nil {
 		s.Stats.Inc(SetFail)
@@ -429,7 +431,7 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 	e := newEvent(action, nodePath, nextIndex, nextIndex)
 	eNode := e.Node
 
-	n, _ := d.GetChild(newNodeName)
+	n, _ := d.GetChild(nodeName)
 
 	// force will try to replace a existing file
 	if n != nil {
@@ -439,13 +441,13 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 			}
 			eNode.PrevValue, _ = n.Read()
 
-			n.Remove(false, nil)
+			n.Remove(false, false, nil)
 		} else {
 			return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
 		}
 	}
 
-	if len(value) != 0 { // create file
+	if !dir { // create file
 		eNode.Value = value
 
 		n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
@@ -454,7 +456,6 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 		eNode.Dir = true
 
 		n = newDir(s, nodePath, nextIndex, d, "", expireTime)
-
 	}
 
 	// we are sure d is a directory and does not have the children with name n.Name
@@ -512,7 +513,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) {
 		}
 
 		s.ttlKeyHeap.pop()
-		node.Remove(true, nil)
+		node.Remove(true, true, nil)
 
 		s.CurrentIndex++
 

+ 163 - 67
store/store_test.go

@@ -27,7 +27,7 @@ import (
 // Ensure that the store can retrieve an existing value.
 func TestStoreGetValue(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	e, err := s.Get("/foo", false, false)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "get", "")
@@ -39,13 +39,13 @@ func TestStoreGetValue(t *testing.T) {
 // Note that hidden files should not be returned.
 func TestStoreGetDirectory(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent)
-	s.Create("/foo/bar", "X", false, Permanent)
-	s.Create("/foo/_hidden", "*", false, Permanent)
-	s.Create("/foo/baz", "", false, Permanent)
-	s.Create("/foo/baz/bat", "Y", false, Permanent)
-	s.Create("/foo/baz/_hidden", "*", false, Permanent)
-	s.Create("/foo/baz/ttl", "Y", false, time.Now().Add(time.Second*3))
+	s.Create("/foo", true, "", false, Permanent)
+	s.Create("/foo/bar", false, "X", false, Permanent)
+	s.Create("/foo/_hidden", false, "*", false, Permanent)
+	s.Create("/foo/baz", true, "", false, Permanent)
+	s.Create("/foo/baz/bat", false, "Y", false, Permanent)
+	s.Create("/foo/baz/_hidden", false, "*", false, Permanent)
+	s.Create("/foo/baz/ttl", false, "Y", false, time.Now().Add(time.Second*3))
 	e, err := s.Get("/foo", true, false)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "get", "")
@@ -69,12 +69,12 @@ func TestStoreGetDirectory(t *testing.T) {
 // Ensure that the store can retrieve a directory in sorted order.
 func TestStoreGetSorted(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent)
-	s.Create("/foo/x", "0", false, Permanent)
-	s.Create("/foo/z", "0", false, Permanent)
-	s.Create("/foo/y", "", false, Permanent)
-	s.Create("/foo/y/a", "0", false, Permanent)
-	s.Create("/foo/y/b", "0", false, Permanent)
+	s.Create("/foo", true, "", false, Permanent)
+	s.Create("/foo/x", false, "0", false, Permanent)
+	s.Create("/foo/z", false, "0", false, Permanent)
+	s.Create("/foo/y", true, "", false, Permanent)
+	s.Create("/foo/y/a", false, "0", false, Permanent)
+	s.Create("/foo/y/b", false, "0", false, Permanent)
 	e, err := s.Get("/foo", true, true)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Node.Nodes[0].Key, "/foo/x", "")
@@ -84,10 +84,54 @@ func TestStoreGetSorted(t *testing.T) {
 	assert.Equal(t, e.Node.Nodes[2].Key, "/foo/z", "")
 }
 
+func TestSet(t *testing.T) {
+	s := newStore()
+
+	// Set /foo=""
+	e, err := s.Set("/foo", false, "", Permanent)
+	assert.Nil(t, err, "")
+	assert.Equal(t, e.Action, "set", "")
+	assert.Equal(t, e.Node.Key, "/foo", "")
+	assert.False(t, e.Node.Dir, "")
+	assert.Equal(t, e.Node.PrevValue, "", "")
+	assert.Equal(t, e.Node.Value, "", "")
+	assert.Nil(t, e.Node.Nodes, "")
+	assert.Nil(t, e.Node.Expiration, "")
+	assert.Equal(t, e.Node.TTL, 0, "")
+	assert.Equal(t, e.Node.ModifiedIndex, uint64(1), "")
+
+	// Set /foo="bar"
+	e, err = s.Set("/foo", false, "bar", Permanent)
+	assert.Nil(t, err, "")
+	assert.Equal(t, e.Action, "set", "")
+	assert.Equal(t, e.Node.Key, "/foo", "")
+	assert.False(t, e.Node.Dir, "")
+	assert.Equal(t, e.Node.PrevValue, "", "")
+	assert.Equal(t, e.Node.Value, "bar", "")
+	assert.Nil(t, e.Node.Nodes, "")
+	assert.Nil(t, e.Node.Expiration, "")
+	assert.Equal(t, e.Node.TTL, 0, "")
+	assert.Equal(t, e.Node.ModifiedIndex, uint64(2), "")
+
+	// Set /dir as a directory
+	e, err = s.Set("/dir", true, "", Permanent)
+	assert.Nil(t, err, "")
+	assert.Equal(t, e.Action, "set", "")
+	assert.Equal(t, e.Node.Key, "/dir", "")
+	assert.True(t, e.Node.Dir, "")
+	assert.Equal(t, e.Node.PrevValue, "", "")
+	assert.Equal(t, e.Node.Value, "", "")
+	assert.Nil(t, e.Node.Nodes, "")
+	assert.Nil(t, e.Node.Expiration, "")
+	assert.Equal(t, e.Node.TTL, 0, "")
+	assert.Equal(t, e.Node.ModifiedIndex, uint64(3), "")
+}
+
 // Ensure that the store can create a new key if it doesn't already exist.
 func TestStoreCreateValue(t *testing.T) {
 	s := newStore()
-	e, err := s.Create("/foo", "bar", false, Permanent)
+	// Create /foo=bar
+	e, err := s.Create("/foo", false, "bar", false, Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
@@ -98,12 +142,26 @@ func TestStoreCreateValue(t *testing.T) {
 	assert.Nil(t, e.Node.Expiration, "")
 	assert.Equal(t, e.Node.TTL, 0, "")
 	assert.Equal(t, e.Node.ModifiedIndex, uint64(1), "")
+
+	// Create /empty=""
+	e, err = s.Create("/empty", false, "", false, Permanent)
+	assert.Nil(t, err, "")
+	assert.Equal(t, e.Action, "create", "")
+	assert.Equal(t, e.Node.Key, "/empty", "")
+	assert.False(t, e.Node.Dir, "")
+	assert.Equal(t, e.Node.PrevValue, "", "")
+	assert.Equal(t, e.Node.Value, "", "")
+	assert.Nil(t, e.Node.Nodes, "")
+	assert.Nil(t, e.Node.Expiration, "")
+	assert.Equal(t, e.Node.TTL, 0, "")
+	assert.Equal(t, e.Node.ModifiedIndex, uint64(2), "")
+
 }
 
 // Ensure that the store can create a new directory if it doesn't already exist.
 func TestStoreCreateDirectory(t *testing.T) {
 	s := newStore()
-	e, err := s.Create("/foo", "", false, Permanent)
+	e, err := s.Create("/foo", true, "", false, Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
@@ -113,11 +171,14 @@ func TestStoreCreateDirectory(t *testing.T) {
 // Ensure that the store fails to create a key if it already exists.
 func TestStoreCreateFailsIfExists(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent)
-	e, _err := s.Create("/foo", "", false, Permanent)
+	// create /foo as dir
+	s.Create("/foo", true, "", false, Permanent)
+
+	// create /foo as dir again
+	e, _err := s.Create("/foo", true, "", false, Permanent)
 	err := _err.(*etcdErr.Error)
 	assert.Equal(t, err.ErrorCode, etcdErr.EcodeNodeExist, "")
-	assert.Equal(t, err.Message, "Already exists", "")
+	assert.Equal(t, err.Message, "Key already exists", "")
 	assert.Equal(t, err.Cause, "/foo", "")
 	assert.Equal(t, err.Index, uint64(1), "")
 	assert.Nil(t, e, 0, "")
@@ -126,7 +187,9 @@ func TestStoreCreateFailsIfExists(t *testing.T) {
 // Ensure that the store can update a key if it already exists.
 func TestStoreUpdateValue(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	// create /foo=bar
+	s.Create("/foo", false, "bar", false, Permanent)
+	// update /foo="bzr"
 	e, err := s.Update("/foo", "baz", Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "update", "")
@@ -138,16 +201,29 @@ func TestStoreUpdateValue(t *testing.T) {
 	assert.Equal(t, e.Node.ModifiedIndex, uint64(2), "")
 	e, _ = s.Get("/foo", false, false)
 	assert.Equal(t, e.Node.Value, "baz", "")
+
+	// update /foo=""
+	e, err = s.Update("/foo", "", Permanent)
+	assert.Nil(t, err, "")
+	assert.Equal(t, e.Action, "update", "")
+	assert.Equal(t, e.Node.Key, "/foo", "")
+	assert.False(t, e.Node.Dir, "")
+	assert.Equal(t, e.Node.PrevValue, "baz", "")
+	assert.Equal(t, e.Node.Value, "", "")
+	assert.Equal(t, e.Node.TTL, 0, "")
+	assert.Equal(t, e.Node.ModifiedIndex, uint64(3), "")
+	e, _ = s.Get("/foo", false, false)
+	assert.Equal(t, e.Node.Value, "", "")
 }
 
 // Ensure that the store cannot update a directory.
 func TestStoreUpdateFailsIfDirectory(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent)
+	s.Create("/foo", true, "", false, Permanent)
 	e, _err := s.Update("/foo", "baz", Permanent)
 	err := _err.(*etcdErr.Error)
 	assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
-	assert.Equal(t, err.Message, "Not A File", "")
+	assert.Equal(t, err.Message, "Not a file", "")
 	assert.Equal(t, err.Cause, "/foo", "")
 	assert.Nil(t, e, "")
 }
@@ -162,7 +238,7 @@ func TestStoreUpdateValueTTL(t *testing.T) {
 	}()
 	go mockSyncService(s.DeleteExpiredKeys, c)
 
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	_, err := s.Update("/foo", "baz", time.Now().Add(500*time.Millisecond))
 	e, _ := s.Get("/foo", false, false)
 	assert.Equal(t, e.Node.Value, "baz", "")
@@ -183,8 +259,8 @@ func TestStoreUpdateDirTTL(t *testing.T) {
 	}()
 	go mockSyncService(s.DeleteExpiredKeys, c)
 
-	s.Create("/foo", "", false, Permanent)
-	s.Create("/foo/bar", "baz", false, Permanent)
+	s.Create("/foo", true, "", false, Permanent)
+	s.Create("/foo/bar", false, "baz", false, Permanent)
 	_, err := s.Update("/foo", "", time.Now().Add(500*time.Millisecond))
 	e, _ := s.Get("/foo/bar", false, false)
 	assert.Equal(t, e.Node.Value, "baz", "")
@@ -198,8 +274,8 @@ func TestStoreUpdateDirTTL(t *testing.T) {
 // Ensure that the store can delete a value.
 func TestStoreDeleteValue(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
-	e, err := s.Delete("/foo", false)
+	s.Create("/foo", false, "bar", false, Permanent)
+	e, err := s.Delete("/foo", false, false)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "delete", "")
 }
@@ -207,22 +283,53 @@ func TestStoreDeleteValue(t *testing.T) {
 // Ensure that the store can delete a directory if recursive is specified.
 func TestStoreDeleteDiretory(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent)
-	e, err := s.Delete("/foo", true)
+	// create directory /foo
+	s.Create("/foo", true, "", false, Permanent)
+	// delete /foo with dir = true and recursive = false
+	// this should succeed, since the directory is empty
+	e, err := s.Delete("/foo", true, false)
+	assert.Nil(t, err, "")
+	assert.Equal(t, e.Action, "delete", "")
+
+	// create directory /foo and directory /foo/bar
+	s.Create("/foo/bar", true, "", false, Permanent)
+	// delete /foo with dir = true and recursive = false
+	// this should fail, since the directory is not empty
+	_, err = s.Delete("/foo", true, false)
+	assert.NotNil(t, err, "")
+
+	// delete /foo with dir=false and recursive = true
+	// this should succeed, since recursive implies dir=true
+	// and recursively delete should be able to delete all
+	// items under the given directory
+	e, err = s.Delete("/foo", false, true)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "delete", "")
+
+}
+
+// Ensure that the store cannot delete a directory if both of recursive
+// and dir are not specified.
+func TestStoreDeleteDiretoryFailsIfNonRecursiveAndDir(t *testing.T) {
+	s := newStore()
+	s.Create("/foo", true, "", false, Permanent)
+	e, _err := s.Delete("/foo", false, false)
+	err := _err.(*etcdErr.Error)
+	assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
+	assert.Equal(t, err.Message, "Not a file", "")
+	assert.Nil(t, e, "")
 }
 
 func TestRootRdOnly(t *testing.T) {
 	s := newStore()
 
-	_, err := s.Set("/", "", Permanent)
+	_, err := s.Set("/", true, "", Permanent)
 	assert.NotNil(t, err, "")
 
-	_, err = s.Delete("/", true)
+	_, err = s.Delete("/", true, true)
 	assert.NotNil(t, err, "")
 
-	_, err = s.Create("/", "", false, Permanent)
+	_, err = s.Create("/", true, "", false, Permanent)
 	assert.NotNil(t, err, "")
 
 	_, err = s.Update("/", "", Permanent)
@@ -233,21 +340,10 @@ func TestRootRdOnly(t *testing.T) {
 
 }
 
-// Ensure that the store cannot delete a directory if recursive is not specified.
-func TestStoreDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
-	s := newStore()
-	s.Create("/foo", "", false, Permanent)
-	e, _err := s.Delete("/foo", false)
-	err := _err.(*etcdErr.Error)
-	assert.Equal(t, err.ErrorCode, etcdErr.EcodeNotFile, "")
-	assert.Equal(t, err.Message, "Not A File", "")
-	assert.Nil(t, e, "")
-}
-
 // Ensure that the store can conditionally update a key if it has a previous value.
 func TestStoreCompareAndSwapPrevValue(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "compareAndSwap", "")
@@ -260,11 +356,11 @@ func TestStoreCompareAndSwapPrevValue(t *testing.T) {
 // Ensure that the store cannot conditionally update a key if it has the wrong previous value.
 func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	e, _err := s.CompareAndSwap("/foo", "wrong_value", 0, "baz", Permanent)
 	err := _err.(*etcdErr.Error)
 	assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
-	assert.Equal(t, err.Message, "Test Failed", "")
+	assert.Equal(t, err.Message, "Compare failed", "")
 	assert.Nil(t, e, "")
 	e, _ = s.Get("/foo", false, false)
 	assert.Equal(t, e.Node.Value, "bar", "")
@@ -273,7 +369,7 @@ func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) {
 // Ensure that the store can conditionally update a key if it has a previous index.
 func TestStoreCompareAndSwapPrevIndex(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	e, err := s.CompareAndSwap("/foo", "", 1, "baz", Permanent)
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "compareAndSwap", "")
@@ -286,11 +382,11 @@ func TestStoreCompareAndSwapPrevIndex(t *testing.T) {
 // Ensure that the store cannot conditionally update a key if it has the wrong previous index.
 func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	e, _err := s.CompareAndSwap("/foo", "", 100, "baz", Permanent)
 	err := _err.(*etcdErr.Error)
 	assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
-	assert.Equal(t, err.Message, "Test Failed", "")
+	assert.Equal(t, err.Message, "Compare failed", "")
 	assert.Nil(t, e, "")
 	e, _ = s.Get("/foo", false, false)
 	assert.Equal(t, e.Node.Value, "bar", "")
@@ -300,7 +396,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
 func TestStoreWatchCreate(t *testing.T) {
 	s := newStore()
 	c, _ := s.Watch("/foo", false, 0)
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
@@ -312,7 +408,7 @@ func TestStoreWatchCreate(t *testing.T) {
 func TestStoreWatchRecursiveCreate(t *testing.T) {
 	s := newStore()
 	c, _ := s.Watch("/foo", true, 0)
-	s.Create("/foo/bar", "baz", false, Permanent)
+	s.Create("/foo/bar", false, "baz", false, Permanent)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "create", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
@@ -321,7 +417,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
 // Ensure that the store can watch for key updates.
 func TestStoreWatchUpdate(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	c, _ := s.Watch("/foo", false, 0)
 	s.Update("/foo", "baz", Permanent)
 	e := nbselect(c)
@@ -332,7 +428,7 @@ func TestStoreWatchUpdate(t *testing.T) {
 // Ensure that the store can watch for recursive key updates.
 func TestStoreWatchRecursiveUpdate(t *testing.T) {
 	s := newStore()
-	s.Create("/foo/bar", "baz", false, Permanent)
+	s.Create("/foo/bar", false, "baz", false, Permanent)
 	c, _ := s.Watch("/foo", true, 0)
 	s.Update("/foo/bar", "baz", Permanent)
 	e := nbselect(c)
@@ -343,9 +439,9 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
 // Ensure that the store can watch for key deletions.
 func TestStoreWatchDelete(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	c, _ := s.Watch("/foo", false, 0)
-	s.Delete("/foo", false)
+	s.Delete("/foo", false, false)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Node.Key, "/foo", "")
@@ -354,9 +450,9 @@ func TestStoreWatchDelete(t *testing.T) {
 // Ensure that the store can watch for recursive key deletions.
 func TestStoreWatchRecursiveDelete(t *testing.T) {
 	s := newStore()
-	s.Create("/foo/bar", "baz", false, Permanent)
+	s.Create("/foo/bar", false, "baz", false, Permanent)
 	c, _ := s.Watch("/foo", true, 0)
-	s.Delete("/foo/bar", false)
+	s.Delete("/foo/bar", false, false)
 	e := nbselect(c)
 	assert.Equal(t, e.Action, "delete", "")
 	assert.Equal(t, e.Node.Key, "/foo/bar", "")
@@ -365,7 +461,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
 // Ensure that the store can watch for CAS updates.
 func TestStoreWatchCompareAndSwap(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "bar", false, Permanent)
+	s.Create("/foo", false, "bar", false, Permanent)
 	c, _ := s.Watch("/foo", false, 0)
 	s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
 	e := nbselect(c)
@@ -376,7 +472,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
 // Ensure that the store can watch for recursive CAS updates.
 func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
 	s := newStore()
-	s.Create("/foo/bar", "baz", false, Permanent)
+	s.Create("/foo/bar", false, "baz", false, Permanent)
 	c, _ := s.Watch("/foo", true, 0)
 	s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
 	e := nbselect(c)
@@ -394,8 +490,8 @@ func TestStoreWatchExpire(t *testing.T) {
 	}()
 	go mockSyncService(s.DeleteExpiredKeys, stopChan)
 
-	s.Create("/foo", "bar", false, time.Now().Add(500*time.Millisecond))
-	s.Create("/foofoo", "barbarbar", false, time.Now().Add(500*time.Millisecond))
+	s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
+	s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
 
 	c, _ := s.Watch("/", true, 0)
 	e := nbselect(c)
@@ -413,9 +509,9 @@ func TestStoreWatchExpire(t *testing.T) {
 // Ensure that the store can recover from a previously saved state.
 func TestStoreRecover(t *testing.T) {
 	s := newStore()
-	s.Create("/foo", "", false, Permanent)
-	s.Create("/foo/x", "bar", false, Permanent)
-	s.Create("/foo/y", "baz", false, Permanent)
+	s.Create("/foo", true, "", false, Permanent)
+	s.Create("/foo/x", false, "bar", false, Permanent)
+	s.Create("/foo/y", false, "baz", false, Permanent)
 	b, err := s.Save()
 
 	s2 := newStore()
@@ -440,9 +536,9 @@ func TestStoreRecoverWithExpiration(t *testing.T) {
 	}()
 	go mockSyncService(s.DeleteExpiredKeys, c)
 
-	s.Create("/foo", "", false, Permanent)
-	s.Create("/foo/x", "bar", false, Permanent)
-	s.Create("/foo/y", "baz", false, time.Now().Add(5*time.Millisecond))
+	s.Create("/foo", true, "", false, Permanent)
+	s.Create("/foo/x", false, "bar", false, Permanent)
+	s.Create("/foo/y", false, "baz", false, time.Now().Add(5*time.Millisecond))
 	b, err := s.Save()
 
 	time.Sleep(10 * time.Millisecond)

+ 6 - 3
store/v2/command_factory.go

@@ -26,21 +26,23 @@ func (f *CommandFactory) CreateUpgradeCommand() raft.Command {
 }
 
 // CreateSetCommand creates a version 2 command to set a key to a given value in the store.
-func (f *CommandFactory) CreateSetCommand(key string, value string, expireTime time.Time) raft.Command {
+func (f *CommandFactory) CreateSetCommand(key string, dir bool, value string, expireTime time.Time) raft.Command {
 	return &SetCommand{
 		Key:        key,
 		Value:      value,
 		ExpireTime: expireTime,
+		Dir:        dir,
 	}
 }
 
 // CreateCreateCommand creates a version 2 command to create a new key in the store.
-func (f *CommandFactory) CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command {
+func (f *CommandFactory) CreateCreateCommand(key string, dir bool, value string, expireTime time.Time, unique bool) raft.Command {
 	return &CreateCommand{
 		Key:        key,
 		Value:      value,
 		ExpireTime: expireTime,
 		Unique:     unique,
+		Dir:        dir,
 	}
 }
 
@@ -54,10 +56,11 @@ func (f *CommandFactory) CreateUpdateCommand(key string, value string, expireTim
 }
 
 // CreateDeleteCommand creates a version 2 command to delete a key from the store.
-func (f *CommandFactory) CreateDeleteCommand(key string, recursive bool) raft.Command {
+func (f *CommandFactory) CreateDeleteCommand(key string, dir, recursive bool) raft.Command {
 	return &DeleteCommand{
 		Key:       key,
 		Recursive: recursive,
+		Dir:       dir,
 	}
 }
 

+ 2 - 1
store/v2/create_command.go

@@ -18,6 +18,7 @@ type CreateCommand struct {
 	Value      string    `json:"value"`
 	ExpireTime time.Time `json:"expireTime"`
 	Unique     bool      `json:"unique"`
+	Dir        bool      `json:"dir"`
 }
 
 // The name of the create command in the log
@@ -29,7 +30,7 @@ func (c *CreateCommand) CommandName() string {
 func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(store.Store)
 
-	e, err := s.Create(c.Key, c.Value, c.Unique, c.ExpireTime)
+	e, err := s.Create(c.Key, c.Dir, c.Value, c.Unique, c.ExpireTime)
 
 	if err != nil {
 		log.Debug(err)

+ 7 - 1
store/v2/delete_command.go

@@ -14,6 +14,7 @@ func init() {
 type DeleteCommand struct {
 	Key       string `json:"key"`
 	Recursive bool   `json:"recursive"`
+	Dir       bool   `json:"dir"`
 }
 
 // The name of the delete command in the log
@@ -25,7 +26,12 @@ func (c *DeleteCommand) CommandName() string {
 func (c *DeleteCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(store.Store)
 
-	e, err := s.Delete(c.Key, c.Recursive)
+	if c.Recursive {
+		// recursive implies dir
+		c.Dir = true
+	}
+
+	e, err := s.Delete(c.Key, c.Dir, c.Recursive)
 
 	if err != nil {
 		log.Debug(err)

+ 2 - 1
store/v2/set_command.go

@@ -17,6 +17,7 @@ type SetCommand struct {
 	Key        string    `json:"key"`
 	Value      string    `json:"value"`
 	ExpireTime time.Time `json:"expireTime"`
+	Dir        bool      `json:"dir"`
 }
 
 // The name of the create command in the log
@@ -29,7 +30,7 @@ func (c *SetCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(store.Store)
 
 	// create a new node or replace the old node.
-	e, err := s.Set(c.Key, c.Value, c.ExpireTime)
+	e, err := s.Set(c.Key, c.Dir, c.Value, c.ExpireTime)
 
 	if err != nil {
 		log.Debug(err)

+ 1 - 1
tests/functional/v1_migration_test.go

@@ -92,7 +92,7 @@ func TestV1ClusterMigration(t *testing.T) {
 	body := tests.ReadBody(resp)
 	assert.Nil(t, err, "")
 	assert.Equal(t, resp.StatusCode, 400)
-	assert.Equal(t, string(body), `{"errorCode":100,"message":"Key Not Found","cause":"/message","index":11}`+"\n")
+	assert.Equal(t, string(body), `{"errorCode":100,"message":"Key not found","cause":"/message","index":11}`+"\n")
 
 	// Ensure TTL'd message is removed.
 	resp, err = tests.Get("http://localhost:4001/v2/keys/foo")

+ 3 - 1
third_party/github.com/coreos/raft/log.go

@@ -366,7 +366,8 @@ func (l *Log) setCommitIndex(index uint64) error {
 	// this is not error any more after limited the number of sending entries
 	// commit up to what we already have
 	if index > l.startIndex+uint64(len(l.entries)) {
-		debugln("raft.Log: Commit index", index, "set back to ", len(l.entries))
+		debugln("raft.StartIndex", l.startIndex)
+		debugln("raft.Log: Commit index", index, "set back to ", l.startIndex+uint64(len(l.entries)))
 		index = l.startIndex + uint64(len(l.entries))
 	}
 
@@ -386,6 +387,7 @@ func (l *Log) setCommitIndex(index uint64) error {
 	// follower 2 should reply success and let leader 3 update the committed index to 80
 
 	if index < l.commitIndex {
+		debugln("raft.Log: index", index, "committedIndex", l.commitIndex)
 		return nil
 	}