Browse Source

Merge pull request #223 from xiangli-cmu/refactoring_store

refactor create do not need to check existence first
Xiang Li 12 years ago
parent
commit
d8d712e8f0
2 changed files with 32 additions and 25 deletions
  1. 20 24
      store/store.go
  2. 12 1
      store/store_test.go

+ 20 - 24
store/store.go

@@ -245,9 +245,10 @@ func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index ui
 	if err != nil {
 		err.Index = index
 		err.Term = term
+		return nil, err
 	}
 
-	return c, err
+	return c, nil
 }
 
 // walk function walks all the nodePath and apply the walkFunc on each directory
@@ -275,25 +276,14 @@ func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string
 func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool,
 	expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
 
+	s.Index, s.Term = index, term
+
 	if incrementalSuffix { // append unique incremental suffix to the node path
 		nodePath += "_" + strconv.FormatUint(index, 10)
 	}
 
 	nodePath = path.Clean(path.Join("/", nodePath))
 
-	// make sure we can create the node
-	_, err := s.internalGet(nodePath, index, term)
-
-	if err == nil && !force { // key already exists
-		s.Stats.Inc(SetFail)
-		return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, index, term)
-	}
-
-	if err != nil && err.ErrorCode == 104 { // we cannot create the key due to meet a file while walking
-		s.Stats.Inc(SetFail)
-		return nil, err
-	}
-
 	dir, newNodeName := path.Split(nodePath)
 
 	// walk through the nodePath, create dirs and get the last directory node
@@ -301,35 +291,37 @@ func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix
 
 	if err != nil {
 		s.Stats.Inc(SetFail)
+		err.Index, err.Term = s.Index, s.Term
 		return nil, err
 	}
 
 	e := newEvent(action, nodePath, s.Index, s.Term)
 
-	if force { // force will try to replace a existing file
-		n, _ := d.GetChild(newNodeName)
-		if n != nil {
+	n, _ := d.GetChild(newNodeName)
+
+	// force will try to replace a existing file
+	if n != nil {
+		if force {
 			if n.IsDir() {
 				return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
 			}
 			e.PrevValue, _ = n.Read()
 
 			n.Remove(false, nil)
-
+		} else {
+			return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, index, term)
 		}
 	}
 
-	var n *Node
-
 	if len(value) != 0 { // create file
 		e.Value = value
 
-		n = newKV(nodePath, value, s.Index, s.Term, d, "", expireTime)
+		n = newKV(nodePath, value, index, term, d, "", expireTime)
 
 	} else { // create directory
 		e.Dir = true
 
-		n = newDir(nodePath, s.Index, s.Term, d, "", expireTime)
+		n = newDir(nodePath, index, term, d, "", expireTime)
 
 	}
 
@@ -388,10 +380,14 @@ func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node,
 // If it does not exist, this function will create a new directory and return the pointer to that node.
 // If it is a file, this function will return error.
 func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
-	subDir, ok := parent.Children[dirName]
+	node, ok := parent.Children[dirName]
 
 	if ok {
-		return subDir, nil
+		if node.IsDir() {
+			return node, nil
+		}
+
+		return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm)
 	}
 
 	n := newDir(path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent)

+ 12 - 1
store/store_test.go

@@ -21,6 +21,17 @@ func TestCreateAndGet(t *testing.T) {
 
 	s.Delete("/foobar", true, 1, 1)
 
+	s.Create("/foobar/foo", "bar", false, false, Permanent, 1, 1)
+
+	// already exist, create should fail
+	_, err = s.Create("/foobar", "bar", false, false, Permanent, 1, 1)
+
+	if err == nil {
+		t.Fatal("Create should fail")
+	}
+
+	s.Delete("/foobar", true, 1, 1)
+
 	// this should create successfully
 	createAndGet(s, "/foobar", t)
 	createAndGet(s, "/foo/bar", t)
@@ -365,7 +376,7 @@ func TestWatch(t *testing.T) {
 	c, _ = s.Watch("/foo", true, 0, 7, 1)
 	s.Delete("/foo/foo/boo", false, 8, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" || e.Action != Delete {
+	if e == nil || e.Key != "/foo/foo/boo" || e.Action != Delete {
 		t.Fatal("watch for Delete subdirectory fails")
 	}