Browse Source

teach store.Store about CompareAndDelete

rick 12 years ago
parent
commit
702cf1cc36
2 changed files with 93 additions and 2 deletions
  1. 53 2
      store/store.go
  2. 40 0
      store/store_test.go

+ 53 - 2
store/store.go

@@ -260,7 +260,7 @@ func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
 	}
 
 	callback := func(path string) { // notify function
-		// notify the watchers with delted set true
+		// notify the watchers with deleted set true
 		s.WatcherHub.notifyWatchers(e, path, true)
 	}
 
@@ -280,6 +280,58 @@ func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
 	return e, nil
 }
 
+func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64,
+) (*Event, error) {
+
+	nodePath = path.Clean(path.Join("/", nodePath))
+
+	s.worldLock.Lock()
+	defer s.worldLock.Unlock()
+
+	n, err := s.internalGet(nodePath)
+
+	if err != nil { // if the node does not exist, return error
+		s.Stats.Inc(DeleteFail)
+		return nil, err
+	}
+
+	if n.IsDir() { // can only test and set file
+		s.Stats.Inc(DeleteFail)
+		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
+	}
+
+	// If both of the prevValue and prevIndex are given, we will test both of them.
+	// Command will be executed, only if both of the tests are successful.
+	if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
+
+		e := newEvent(Delete, nodePath, s.CurrentIndex+1)
+		e.PrevValue = n.Value
+
+		callback := func(path string) { // notify function
+			// notify the watchers with deleted set true
+			s.WatcherHub.notifyWatchers(e, path, true)
+		}
+
+		err = n.Remove(false, callback)
+
+		if err != nil {
+			s.Stats.Inc(DeleteFail)
+			return nil, err
+		}
+
+		// update etcd index
+		s.CurrentIndex++
+
+		s.WatcherHub.notify(e)
+		s.Stats.Inc(DeleteSuccess)
+		return e, nil
+	}
+
+	cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
+	s.Stats.Inc(DeleteFail)
+	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
+}
+
 func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
 	prefix = path.Clean(path.Join("/", prefix))
 
@@ -396,7 +448,6 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 		expireTime = Permanent
 	}
 
-
 	dir, newNodeName := path.Split(nodePath)
 
 	// walk through the nodePath, create dirs and get the last directory node

+ 40 - 0
store/store_test.go

@@ -204,6 +204,46 @@ func TestStoreDeleteValue(t *testing.T) {
 	assert.Equal(t, e.Action, "delete", "")
 }
 
+func TestStoreCompareAndDeletePrevValue(t *testing.T) {
+	s := newStore()
+	s.Create("/foo", "bar", false, Permanent)
+	e, err := s.CompareAndDelete("/foo", "bar", 0)
+	assert.Nil(t, err, "")
+	assert.Equal(t, e.Action, "delete", "")
+}
+
+func TestStoreCompareAndDeletePrevValueFailsIfNotMatch(t *testing.T) {
+	s := newStore()
+	s.Create("/foo", "bar", false, Permanent)
+	e, _err := s.CompareAndDelete("/foo", "baz", 0)
+	err := _err.(*etcdErr.Error)
+	assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
+	assert.Equal(t, err.Message, "Test Failed", "")
+	assert.Nil(t, e, "")
+	e, _ = s.Get("/foo", false, false)
+	assert.Equal(t, e.Value, "bar", "")
+}
+
+func TestStoreCompareAndDeletePrevIndex(t *testing.T) {
+	s := newStore()
+	s.Create("/foo", "bar", false, Permanent)
+	e, err := s.CompareAndDelete("/foo", "", 1)
+	assert.Nil(t, err, "")
+	assert.Equal(t, e.Action, "delete", "")
+}
+
+func TestStoreCompareAndDeletePrevIndexFailsIfNotMatch(t *testing.T) {
+	s := newStore()
+	s.Create("/foo", "bar", false, Permanent)
+	e, _err := s.CompareAndDelete("/foo", "baz", 100)
+	err := _err.(*etcdErr.Error)
+	assert.Equal(t, err.ErrorCode, etcdErr.EcodeTestFailed, "")
+	assert.Equal(t, err.Message, "Test Failed", "")
+	assert.Nil(t, e, "")
+	e, _ = s.Get("/foo", false, false)
+	assert.Equal(t, e.Value, "bar", "")
+}
+
 // Ensure that the store can delete a directory if recursive is specified.
 func TestStoreDeleteDiretory(t *testing.T) {
 	s := newStore()