|
|
@@ -51,6 +51,7 @@ type Store interface {
|
|
|
CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
|
|
|
value string, expireTime time.Time) (*Event, error)
|
|
|
Delete(nodePath string, recursive, dir bool) (*Event, error)
|
|
|
+ CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
|
|
|
Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
|
|
|
|
|
|
Save() ([]byte, error)
|
|
|
@@ -207,37 +208,37 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- if n.IsDir() { // can only test and set file
|
|
|
+ if n.IsDir() { // can only compare and swap file
|
|
|
s.Stats.Inc(CompareAndSwapFail)
|
|
|
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
|
|
|
}
|
|
|
|
|
|
// If both of the prevValue and prevIndex are given, we will test both of them.
|
|
|
// Command will be executed, only if both of the tests are successful.
|
|
|
- if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
|
|
|
- // update etcd index
|
|
|
- s.CurrentIndex++
|
|
|
+ if !n.Compare(prevValue, prevIndex) {
|
|
|
+ cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
|
|
|
+ s.Stats.Inc(CompareAndSwapFail)
|
|
|
+ return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
|
|
|
+ }
|
|
|
|
|
|
- e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
|
|
|
- eNode := e.Node
|
|
|
+ // update etcd index
|
|
|
+ s.CurrentIndex++
|
|
|
|
|
|
- eNode.PrevValue = n.Value
|
|
|
+ e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
|
|
|
+ eNode := e.Node
|
|
|
|
|
|
- // if test succeed, write the value
|
|
|
- n.Write(value, s.CurrentIndex)
|
|
|
- n.UpdateTTL(expireTime)
|
|
|
+ eNode.PrevValue = n.Value
|
|
|
|
|
|
- eNode.Value = value
|
|
|
- eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
|
|
|
+ // if test succeed, write the value
|
|
|
+ n.Write(value, s.CurrentIndex)
|
|
|
+ n.UpdateTTL(expireTime)
|
|
|
|
|
|
- s.WatcherHub.notify(e)
|
|
|
- s.Stats.Inc(CompareAndSwapSuccess)
|
|
|
- return e, nil
|
|
|
- }
|
|
|
+ eNode.Value = value
|
|
|
+ eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
|
|
|
|
|
|
- cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
|
|
|
- s.Stats.Inc(CompareAndSwapFail)
|
|
|
- return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
|
|
|
+ s.WatcherHub.notify(e)
|
|
|
+ s.Stats.Inc(CompareAndSwapSuccess)
|
|
|
+ return e, nil
|
|
|
}
|
|
|
|
|
|
// Delete function deletes the node at the given path.
|
|
|
@@ -257,8 +258,6 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
|
|
dir = true
|
|
|
}
|
|
|
|
|
|
- nextIndex := s.CurrentIndex + 1
|
|
|
-
|
|
|
n, err := s.internalGet(nodePath)
|
|
|
|
|
|
if err != nil { // if the node does not exist, return error
|
|
|
@@ -266,6 +265,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
+ nextIndex := s.CurrentIndex + 1
|
|
|
e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
|
|
|
eNode := e.Node
|
|
|
|
|
|
@@ -276,7 +276,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
|
|
}
|
|
|
|
|
|
callback := func(path string) { // notify function
|
|
|
- // notify the watchers with delted set true
|
|
|
+ // notify the watchers with deleted set true
|
|
|
s.WatcherHub.notifyWatchers(e, path, true)
|
|
|
}
|
|
|
|
|
|
@@ -296,9 +296,52 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
|
|
return e, nil
|
|
|
}
|
|
|
|
|
|
+func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
|
|
|
+ nodePath = path.Clean(path.Join("/", nodePath))
|
|
|
+
|
|
|
+ s.worldLock.Lock()
|
|
|
+ defer s.worldLock.Unlock()
|
|
|
+
|
|
|
+ n, err := s.internalGet(nodePath)
|
|
|
+
|
|
|
+ if err != nil { // if the node does not exist, return error
|
|
|
+ s.Stats.Inc(CompareAndDeleteFail)
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if n.IsDir() { // can only compare and delete file
|
|
|
+ s.Stats.Inc(CompareAndSwapFail)
|
|
|
+ return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
|
|
|
+ }
|
|
|
+
|
|
|
+ // If both of the prevValue and prevIndex are given, we will test both of them.
|
|
|
+ // Command will be executed, only if both of the tests are successful.
|
|
|
+ if !n.Compare(prevValue, prevIndex) {
|
|
|
+ cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
|
|
|
+ s.Stats.Inc(CompareAndDeleteFail)
|
|
|
+ return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
|
|
|
+ }
|
|
|
+
|
|
|
+ // update etcd index
|
|
|
+ s.CurrentIndex++
|
|
|
+
|
|
|
+ e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
|
|
|
+
|
|
|
+ callback := func(path string) { // notify function
|
|
|
+ // notify the watchers with deleted set true
|
|
|
+ s.WatcherHub.notifyWatchers(e, path, true)
|
|
|
+ }
|
|
|
+
|
|
|
+ // delete a key-value pair, no error should happen
|
|
|
+ n.Remove(false, false, callback)
|
|
|
+
|
|
|
+ s.WatcherHub.notify(e)
|
|
|
+ s.Stats.Inc(CompareAndDeleteSuccess)
|
|
|
+ return e, nil
|
|
|
+}
|
|
|
+
|
|
|
func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
|
|
|
key = path.Clean(path.Join("/", key))
|
|
|
-
|
|
|
nextIndex := s.CurrentIndex + 1
|
|
|
|
|
|
s.worldLock.RLock()
|