Browse Source

store: refactor

use defer statement to update `Stats` and report R/W Sucess/Failure, so
that the logic of Store's CURD operation and `Stats` update logic can be
separated.
mqliang 10 years ago
parent
commit
1c559bdb33
1 changed files with 103 additions and 70 deletions
  1. 103 70
      store/store.go

+ 103 - 70
store/store.go

@@ -112,20 +112,34 @@ func (s *store) Index() uint64 {
 // If recursive is true, it will return all the content under the node path.
 // If recursive is true, it will return all the content under the node path.
 // If sorted is true, it will sort the content by keys.
 // If sorted is true, it will sort the content by keys.
 func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
 func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
-	s.worldLock.RLock()
-	defer s.worldLock.RUnlock()
+	var err *etcdErr.Error
 
 
-	nodePath = path.Clean(path.Join("/", nodePath))
+	s.worldLock.Lock()
+	defer s.worldLock.Unlock()
 
 
-	n, err := s.internalGet(nodePath)
+	defer func() {
+		if err == nil {
+			s.Stats.Inc(GetSuccess)
+			if recursive {
+				reportReadSuccess(GetRecursive)
+			} else {
+				reportReadSuccess(Get)
+			}
+			return
+		}
 
 
-	if err != nil {
 		s.Stats.Inc(GetFail)
 		s.Stats.Inc(GetFail)
 		if recursive {
 		if recursive {
 			reportReadFailure(GetRecursive)
 			reportReadFailure(GetRecursive)
 		} else {
 		} else {
 			reportReadFailure(Get)
 			reportReadFailure(Get)
 		}
 		}
+	}()
+
+	nodePath = path.Clean(path.Join("/", nodePath))
+
+	n, err := s.internalGet(nodePath)
+	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -133,13 +147,6 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
 	e.EtcdIndex = s.CurrentIndex
 	e.EtcdIndex = s.CurrentIndex
 	e.Node.loadInternalNode(n, recursive, sorted, s.clock)
 	e.Node.loadInternalNode(n, recursive, sorted, s.clock)
 
 
-	s.Stats.Inc(GetSuccess)
-	if recursive {
-		reportReadSuccess(GetRecursive)
-	} else {
-		reportReadSuccess(Get)
-	}
-
 	return e, nil
 	return e, nil
 }
 }
 
 
@@ -147,26 +154,36 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
 // If the node has already existed, create will fail.
 // If the node has already existed, create will fail.
 // If any node on the path is a file, create will fail.
 // If any node on the path is a file, create will fail.
 func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
 func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
+	var err *etcdErr.Error
+
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
-	e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
 
 
-	if err == nil {
-		e.EtcdIndex = s.CurrentIndex
-		s.WatcherHub.notify(e)
-		s.Stats.Inc(CreateSuccess)
-		reportWriteSuccess(Create)
-	} else {
+	defer func() {
+		if err == nil {
+			s.Stats.Inc(CreateSuccess)
+			reportWriteSuccess(Create)
+			return
+		}
+
 		s.Stats.Inc(CreateFail)
 		s.Stats.Inc(CreateFail)
 		reportWriteFailure(Create)
 		reportWriteFailure(Create)
+	}()
+
+	e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
+	if err != nil {
+		return nil, err
 	}
 	}
 
 
-	return e, err
+	e.EtcdIndex = s.CurrentIndex
+	s.WatcherHub.notify(e)
+
+	return e, nil
 }
 }
 
 
 // Set creates or replace the node at nodePath.
 // Set creates or replace the node at nodePath.
 func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
 func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
-	var err error
+	var err *etcdErr.Error
 
 
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
@@ -175,10 +192,11 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim
 		if err == nil {
 		if err == nil {
 			s.Stats.Inc(SetSuccess)
 			s.Stats.Inc(SetSuccess)
 			reportWriteSuccess(Set)
 			reportWriteSuccess(Set)
-		} else {
-			s.Stats.Inc(SetFail)
-			reportWriteFailure(Set)
+			return
 		}
 		}
+
+		s.Stats.Inc(SetFail)
+		reportWriteFailure(Set)
 	}()
 	}()
 
 
 	// Get prevNode value
 	// Get prevNode value
@@ -222,9 +240,22 @@ func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64)
 func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
 func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
 	value string, expireTime time.Time) (*Event, error) {
 	value string, expireTime time.Time) (*Event, error) {
 
 
+	var err *etcdErr.Error
+
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
 
 
+	defer func() {
+		if err == nil {
+			s.Stats.Inc(CompareAndSwapSuccess)
+			reportWriteSuccess(CompareAndSwap)
+			return
+		}
+
+		s.Stats.Inc(CompareAndSwapFail)
+		reportWriteFailure(CompareAndSwap)
+	}()
+
 	nodePath = path.Clean(path.Join("/", nodePath))
 	nodePath = path.Clean(path.Join("/", nodePath))
 	// we do not allow the user to change "/"
 	// we do not allow the user to change "/"
 	if s.readonlySet.Contains(nodePath) {
 	if s.readonlySet.Contains(nodePath) {
@@ -232,26 +263,20 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
 	}
 	}
 
 
 	n, err := s.internalGet(nodePath)
 	n, err := s.internalGet(nodePath)
-
 	if err != nil {
 	if err != nil {
-		s.Stats.Inc(CompareAndSwapFail)
-		reportWriteFailure(CompareAndSwap)
 		return nil, err
 		return nil, err
 	}
 	}
-
 	if n.IsDir() { // can only compare and swap file
 	if n.IsDir() { // can only compare and swap file
-		s.Stats.Inc(CompareAndSwapFail)
-		reportWriteFailure(CompareAndSwap)
-		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
+		err = etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
+		return nil, err
 	}
 	}
 
 
 	// If both of the prevValue and prevIndex are given, we will test both of them.
 	// If both of the prevValue and prevIndex are given, we will test both of them.
 	// Command will be executed, only if both of the tests are successful.
 	// Command will be executed, only if both of the tests are successful.
 	if ok, which := n.Compare(prevValue, prevIndex); !ok {
 	if ok, which := n.Compare(prevValue, prevIndex); !ok {
 		cause := getCompareFailCause(n, which, prevValue, prevIndex)
 		cause := getCompareFailCause(n, which, prevValue, prevIndex)
-		s.Stats.Inc(CompareAndSwapFail)
-		reportWriteFailure(CompareAndSwap)
-		return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
+		err = etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
+		return nil, err
 	}
 	}
 
 
 	// update etcd index
 	// update etcd index
@@ -272,8 +297,6 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
 	eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
 	eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
 
 
 	s.WatcherHub.notify(e)
 	s.WatcherHub.notify(e)
-	s.Stats.Inc(CompareAndSwapSuccess)
-	reportWriteSuccess(CompareAndSwap)
 
 
 	return e, nil
 	return e, nil
 }
 }
@@ -281,9 +304,22 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
 // Delete deletes the node at the given path.
 // Delete deletes the node at the given path.
 // If the node is a directory, recursive must be true to delete it.
 // If the node is a directory, recursive must be true to delete it.
 func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
 func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
+	var err *etcdErr.Error
+
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
 
 
+	defer func() {
+		if err == nil {
+			s.Stats.Inc(DeleteSuccess)
+			reportWriteSuccess(Delete)
+			return
+		}
+
+		s.Stats.Inc(DeleteFail)
+		reportWriteFailure(Delete)
+	}()
+
 	nodePath = path.Clean(path.Join("/", nodePath))
 	nodePath = path.Clean(path.Join("/", nodePath))
 	// we do not allow the user to change "/"
 	// we do not allow the user to change "/"
 	if s.readonlySet.Contains(nodePath) {
 	if s.readonlySet.Contains(nodePath) {
@@ -296,10 +332,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
 	}
 	}
 
 
 	n, err := s.internalGet(nodePath)
 	n, err := s.internalGet(nodePath)
-
 	if err != nil { // if the node does not exist, return error
 	if err != nil { // if the node does not exist, return error
-		s.Stats.Inc(DeleteFail)
-		reportWriteFailure(Delete)
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -319,10 +352,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
 	}
 	}
 
 
 	err = n.Remove(dir, recursive, callback)
 	err = n.Remove(dir, recursive, callback)
-
 	if err != nil {
 	if err != nil {
-		s.Stats.Inc(DeleteFail)
-		reportWriteFailure(Delete)
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -331,29 +361,33 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
 
 
 	s.WatcherHub.notify(e)
 	s.WatcherHub.notify(e)
 
 
-	s.Stats.Inc(DeleteSuccess)
-	reportWriteSuccess(Delete)
-
 	return e, nil
 	return e, nil
 }
 }
 
 
 func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
 func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
-	nodePath = path.Clean(path.Join("/", nodePath))
+	var err *etcdErr.Error
 
 
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
 
 
-	n, err := s.internalGet(nodePath)
+	defer func() {
+		if err == nil {
+			s.Stats.Inc(CompareAndDeleteSuccess)
+			reportWriteSuccess(CompareAndDelete)
+			return
+		}
 
 
-	if err != nil { // if the node does not exist, return error
 		s.Stats.Inc(CompareAndDeleteFail)
 		s.Stats.Inc(CompareAndDeleteFail)
 		reportWriteFailure(CompareAndDelete)
 		reportWriteFailure(CompareAndDelete)
+	}()
+
+	nodePath = path.Clean(path.Join("/", nodePath))
+
+	n, err := s.internalGet(nodePath)
+	if err != nil { // if the node does not exist, return error
 		return nil, err
 		return nil, err
 	}
 	}
-
 	if n.IsDir() { // can only compare and delete file
 	if n.IsDir() { // can only compare and delete file
-		s.Stats.Inc(CompareAndSwapFail)
-		reportWriteFailure(CompareAndDelete)
 		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
 		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
 	}
 	}
 
 
@@ -361,8 +395,6 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
 	// Command will be executed, only if both of the tests are successful.
 	// Command will be executed, only if both of the tests are successful.
 	if ok, which := n.Compare(prevValue, prevIndex); !ok {
 	if ok, which := n.Compare(prevValue, prevIndex); !ok {
 		cause := getCompareFailCause(n, which, prevValue, prevIndex)
 		cause := getCompareFailCause(n, which, prevValue, prevIndex)
-		s.Stats.Inc(CompareAndDeleteFail)
-		reportWriteFailure(CompareAndDelete)
 		return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
 		return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
 	}
 	}
 
 
@@ -384,8 +416,6 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
 	}
 	}
 
 
 	s.WatcherHub.notify(e)
 	s.WatcherHub.notify(e)
-	s.Stats.Inc(CompareAndDeleteSuccess)
-	reportWriteSuccess(CompareAndDelete)
 
 
 	return e, nil
 	return e, nil
 }
 }
@@ -423,7 +453,6 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-
 	}
 	}
 
 
 	return curr, nil
 	return curr, nil
@@ -433,9 +462,22 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string
 // If the node is a file, the value and the ttl can be updated.
 // If the node is a file, the value and the ttl can be updated.
 // If the node is a directory, only the ttl can be updated.
 // If the node is a directory, only the ttl can be updated.
 func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
 func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
+	var err *etcdErr.Error
+
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
 
 
+	defer func() {
+		if err == nil {
+			s.Stats.Inc(UpdateSuccess)
+			reportWriteSuccess(Update)
+			return
+		}
+
+		s.Stats.Inc(UpdateFail)
+		reportWriteFailure(Update)
+	}()
+
 	nodePath = path.Clean(path.Join("/", nodePath))
 	nodePath = path.Clean(path.Join("/", nodePath))
 	// we do not allow the user to change "/"
 	// we do not allow the user to change "/"
 	if s.readonlySet.Contains(nodePath) {
 	if s.readonlySet.Contains(nodePath) {
@@ -445,25 +487,19 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
 	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
 	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
 
 
 	n, err := s.internalGet(nodePath)
 	n, err := s.internalGet(nodePath)
-
 	if err != nil { // if the node does not exist, return error
 	if err != nil { // if the node does not exist, return error
-		s.Stats.Inc(UpdateFail)
-		reportWriteFailure(Update)
 		return nil, err
 		return nil, err
 	}
 	}
+	if n.IsDir() && len(newValue) != 0 {
+		// if the node is a directory, we cannot update value to non-empty
+		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
+	}
 
 
 	e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
 	e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
 	e.EtcdIndex = nextIndex
 	e.EtcdIndex = nextIndex
 	e.PrevNode = n.Repr(false, false, s.clock)
 	e.PrevNode = n.Repr(false, false, s.clock)
 	eNode := e.Node
 	eNode := e.Node
 
 
-	if n.IsDir() && len(newValue) != 0 {
-		// if the node is a directory, we cannot update value to non-empty
-		s.Stats.Inc(UpdateFail)
-		reportWriteFailure(Update)
-		return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
-	}
-
 	n.Write(newValue, nextIndex)
 	n.Write(newValue, nextIndex)
 
 
 	if n.IsDir() {
 	if n.IsDir() {
@@ -481,16 +517,13 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
 
 
 	s.WatcherHub.notify(e)
 	s.WatcherHub.notify(e)
 
 
-	s.Stats.Inc(UpdateSuccess)
-	reportWriteSuccess(Update)
-
 	s.CurrentIndex = nextIndex
 	s.CurrentIndex = nextIndex
 
 
 	return e, nil
 	return e, nil
 }
 }
 
 
 func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
 func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
-	expireTime time.Time, action string) (*Event, error) {
+	expireTime time.Time, action string) (*Event, *etcdErr.Error) {
 
 
 	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
 	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1