Browse Source

rename to DupCnt, duped; add some comments, maintained some format, add notification for immediate expiration

evan-gu 12 years ago
parent
commit
6f591032ef
6 changed files with 30 additions and 33 deletions
  1. 6 9
      store/event.go
  2. 0 3
      store/event_test.go
  3. 17 5
      store/node.go
  4. 2 2
      store/store.go
  5. 4 10
      store/store_test.go
  6. 1 4
      store/watcher.go

+ 6 - 9
store/event.go

@@ -76,7 +76,6 @@ func (eq *eventQueue) back() int {
 }
 
 func (eq *eventQueue) insert(e *Event) {
-
 	index := (eq.back() + 1) % eq.Capacity
 
 	eq.Events[index] = e
@@ -94,7 +93,7 @@ type EventHistory struct {
 	StartIndex uint64
 	LastIndex  uint64
 	LastTerm   uint64
-	DupIndex   uint64
+	DupCnt     uint64 // help to compute the watching point with duplicated indexes in the queue
 	rwl        sync.RWMutex
 }
 
@@ -112,16 +111,16 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
 	eh.rwl.Lock()
 	defer eh.rwl.Unlock()
 
-	DupIndex := uint64(0)
+	duped := uint64(0)
 
 	if e.Index == UndefIndex {
 		e.Index = eh.LastIndex
-		DupIndex = 1
+		duped = 1
 	}
 
 	if e.Term == UndefTerm {
 		e.Term = eh.LastTerm
-		DupIndex = 1
+		duped = 1
 	}
 
 	eh.Queue.insert(e)
@@ -130,7 +129,7 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
 
 	eh.LastIndex = e.Index
 	eh.LastTerm = e.Term
-	eh.DupIndex += DupIndex
+	eh.DupCnt += duped
 
 	return e
 }
@@ -141,7 +140,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 	eh.rwl.RLock()
 	defer eh.rwl.RUnlock()
 
-	start := index - eh.StartIndex + eh.DupIndex
+	start := index - eh.StartIndex + eh.DupCnt
 
 	// the index should locate after the event history's StartIndex
 	// and before its size
@@ -172,13 +171,11 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 			return nil, nil
 		}
 	}
-
 }
 
 // clone will be protected by a stop-world lock
 // do not need to obtain internal lock
 func (eh *EventHistory) clone() *EventHistory {
-
 	clonedQueue := eventQueue{
 		Capacity: eh.Queue.Capacity,
 		Events:   make([]*Event, eh.Queue.Capacity),

+ 0 - 3
store/event_test.go

@@ -28,9 +28,7 @@ func TestEventQueue(t *testing.T) {
 		}
 		j++
 		i = (i + 1) % eh.Queue.Capacity
-
 	}
-
 }
 
 func TestScanHistory(t *testing.T) {
@@ -65,5 +63,4 @@ func TestScanHistory(t *testing.T) {
 	if e != nil {
 		t.Fatalf("bad index shoud reuturn nil")
 	}
-
 }

+ 17 - 5
store/node.go

@@ -66,7 +66,6 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node
 // If the node is a directory and recursive is true, the function will recursively remove
 // add nodes under the receiver node.
 func (n *Node) Remove(recursive bool, callback func(path string)) error {
-
 	n.mu.Lock()
 	defer n.mu.Unlock()
 
@@ -187,7 +186,6 @@ func (n *Node) GetFile(name string) (*Node, error) {
 	}
 
 	return nil, nil
-
 }
 
 // Add function adds a node to the receiver node.
@@ -216,7 +214,6 @@ func (n *Node) Add(child *Node) error {
 	n.Children[name] = child
 
 	return nil
-
 }
 
 // Clone function clone the node recursively and return the new node.
@@ -251,11 +248,23 @@ func (n *Node) recoverAndclean(s *Store) {
 	n.Expire(s)
 }
 
+// Expire function will test if the node is expired.
+// if the node is already expired, delete the node and return.
+// if the node is permemant (this shouldn't happen), return at once.
+// else wait for a period time, then remove the node. and notify the watchhub.
 func (n *Node) Expire(s *Store) {
 	expired, duration := n.IsExpired()
 
 	if expired { // has been expired
+
+		// since the parent function of Expire() runs serially,
+		// there is no need for lock here
+		e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
+		s.WatcherHub.notify(e)
+
 		n.Remove(true, nil)
+		s.Stats.Inc(ExpireCount)
+
 		return
 	}
 
@@ -267,20 +276,23 @@ func (n *Node) Expire(s *Store) {
 		select {
 		// if timeout, delete the node
 		case <-time.After(duration):
+
+			// Lock to avoid race
 			s.worldLock.Lock()
 
 			e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
 			s.WatcherHub.notify(e)
+
 			n.Remove(true, nil)
 			s.Stats.Inc(ExpireCount)
 
 			s.worldLock.Unlock()
+
 			return
 
 		// if stopped, return
 		case <-n.stopExpire:
 			return
-
 		}
 	}()
 }
@@ -294,7 +306,6 @@ func (n *Node) IsHidden() bool {
 	_, name := path.Split(n.Path)
 
 	return name[0] == '_'
-
 }
 
 func (n *Node) IsPermanent() bool {
@@ -355,6 +366,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 		if sorted {
 			sort.Sort(pair)
 		}
+
 		return pair
 	}
 

+ 2 - 2
store/store.go

@@ -164,7 +164,6 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde
 // If the node is a file, the value and the ttl can be updated.
 // If the node is a directory, only the ttl can be updated.
 func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
-
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
 
@@ -197,10 +196,11 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 
 	// update ttl
 	n.UpdateTTL(expireTime, s)
+
 	e.Expiration = &n.ExpireTime
 	e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
-
 	s.WatcherHub.notify(e)
+
 	s.Stats.Inc(UpdateSuccess)
 
 	return e, nil

+ 4 - 10
store/store_test.go

@@ -52,7 +52,6 @@ func TestCreateAndGet(t *testing.T) {
 	if err != nil {
 		t.Fatal("Cannot create /fooDir/bar = bar")
 	}
-
 }
 
 func TestUpdateFile(t *testing.T) {
@@ -81,7 +80,6 @@ func TestUpdateFile(t *testing.T) {
 	}
 
 	// create a directory, update its ttl, to see if it will be deleted
-
 	_, err = s.Create("/foo/foo", "", Permanent, 3, 1)
 
 	if err != nil {
@@ -237,7 +235,6 @@ func TestRemove(t *testing.T) {
 	if err == nil || err.Error() != "Key Not Found" {
 		t.Fatalf("can get the node after deletion ")
 	}
-
 }
 
 func TestExpire(t *testing.T) {
@@ -280,7 +277,6 @@ func TestExpire(t *testing.T) {
 	if err != nil {
 		t.Fatalf("cannot delete the node before expiration", err.Error())
 	}
-
 }
 
 func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
@@ -314,7 +310,6 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
 	if e.PrevValue != "car" || e.Value != "bar" {
 		t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar")
 	}
-
 }
 
 func TestWatch(t *testing.T) {
@@ -404,7 +399,6 @@ func TestWatch(t *testing.T) {
 	if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 {
 		t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e)
 	}
-
 }
 
 func TestSort(t *testing.T) {
@@ -479,8 +473,11 @@ func TestSaveAndRecover(t *testing.T) {
 			panic(err)
 		}
 	}
+
+	// lock to avoid racing with Expire()
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
+
 	if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex {
 		t.Fatal("Error recovered event history start index")
 	}
@@ -497,7 +494,6 @@ func TestSaveAndRecover(t *testing.T) {
 	if err == nil || err.Error() != "Key Not Found" {
 		t.Fatalf("can get the node after deletion ")
 	}
-
 }
 
 // GenKeys randomly generate num of keys with max depth
@@ -513,6 +509,7 @@ func GenKeys(num int, depth int) []string {
 			keys[i] += "/" + strconv.Itoa(rand.Int())
 		}
 	}
+
 	return keys
 }
 
@@ -532,11 +529,9 @@ func createAndGet(s *Store, path string, t *testing.T) {
 	if e.Value != "bar" {
 		t.Fatalf("expect value of %s is bar [%s]", path, e.Value)
 	}
-
 }
 
 func recursiveTestSort(k KeyValuePair, t *testing.T) {
-
 	for i, v := range k.KVPairs[:len(k.KVPairs)-1] {
 		if v.Key >= k.KVPairs[i+1].Key {
 			t.Fatalf("sort failed, [%s] should be placed after [%s]", v.Key, k.KVPairs[i+1].Key)
@@ -545,7 +540,6 @@ func recursiveTestSort(k KeyValuePair, t *testing.T) {
 		if v.Dir {
 			recursiveTestSort(v, t)
 		}
-
 	}
 
 	if v := k.KVPairs[len(k.KVPairs)-1]; v.Dir {

+ 1 - 4
store/watcher.go

@@ -74,21 +74,19 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
 		notifiedAll := true
 
 		for {
-
 			if curr == nil { // we have reached the end of the list
-
 				if notifiedAll {
 					// if we have notified all watcher in the list
 					// we can delete the list
 					delete(wh.watchers, path)
 				}
+
 				break
 			}
 
 			next := curr.Next() // save the next
 
 			w, _ := curr.Value.(*watcher)
-
 			if (w.recursive || force || e.Key == path) && e.Index >= w.sinceIndex {
 				w.eventChan <- e
 				l.Remove(curr)
@@ -98,7 +96,6 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) {
 			}
 
 			curr = next // go to the next one
-
 		}
 	}
 }