Browse Source

fix watcher_hub

Xiang Li 12 years ago
parent
commit
c307b6abca
6 changed files with 77 additions and 47 deletions
  1. 19 11
      store/event_history.go
  2. 6 6
      store/event_test.go
  3. 3 2
      store/store.go
  4. 8 3
      store/store_test.go
  5. 0 1
      store/watcher.go
  6. 41 24
      store/watcher_hub.go

+ 19 - 11
store/event_history.go

@@ -31,19 +31,14 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
 	eh.rwl.Lock()
 	eh.rwl.Lock()
 	defer eh.rwl.Unlock()
 	defer eh.rwl.Unlock()
 
 
-	var duped uint64
-
-	if e.Index == UndefIndex {
-		e.Index = eh.LastIndex
-		e.Term = eh.LastTerm
-		duped = 1
+	if e.Index == eh.LastIndex {
+		eh.DupCnt += 1
 	}
 	}
 
 
 	eh.Queue.insert(e)
 	eh.Queue.insert(e)
 
 
 	eh.LastIndex = e.Index
 	eh.LastIndex = e.Index
 	eh.LastTerm = e.Term
 	eh.LastTerm = e.Term
-	eh.DupCnt += duped
 
 
 	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
 	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
 
 
@@ -52,7 +47,7 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
 
 
 // scan function is enumerating events from the index in history and
 // scan function is enumerating events from the index in history and
 // stops till the first point where the key has identified prefix
 // stops till the first point where the key has identified prefix
-func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) {
+func (eh *EventHistory) scan(prefix string, index uint64) ([]*Event, *etcdErr.Error) {
 	eh.rwl.RLock()
 	eh.rwl.RLock()
 	defer eh.rwl.RUnlock()
 	defer eh.rwl.RUnlock()
 
 
@@ -73,16 +68,29 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Erro
 
 
 	i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
 	i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
 
 
+	events := make([]*Event, 0)
+	var eventIndex uint64
+
 	for {
 	for {
 		e := eh.Queue.Events[i]
 		e := eh.Queue.Events[i]
+
+		if eventIndex != 0 && eventIndex != e.Index {
+			return events, nil
+		}
+
 		if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
 		if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
-			return e, nil
+			eventIndex = e.Index
+			events = append(events, e)
 		}
 		}
 
 
 		i = (i + 1) % eh.Queue.Capacity
 		i = (i + 1) % eh.Queue.Capacity
 
 
-		if i == eh.Queue.back() { // find nothing, return and watch from current index
-			return nil, nil
+		if i == eh.Queue.back() {
+			if eventIndex == 0 { // find nothing, return and watch from current index
+				return nil, nil
+			}
+
+			return events, nil
 		}
 		}
 	}
 	}
 }
 }

+ 6 - 6
store/event_test.go

@@ -42,20 +42,20 @@ func TestScanHistory(t *testing.T) {
 	eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1))
 	eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1))
 
 
 	e, err := eh.scan("/foo", 1)
 	e, err := eh.scan("/foo", 1)
-	if err != nil || e.Index != 1 {
-		t.Fatalf("scan error [/foo] [1] %v", e.Index)
+	if err != nil || e[0].Index != 1 {
+		t.Fatalf("scan error [/foo] [1] %v", e[0].Index)
 	}
 	}
 
 
 	e, err = eh.scan("/foo/bar", 1)
 	e, err = eh.scan("/foo/bar", 1)
 
 
-	if err != nil || e.Index != 2 {
-		t.Fatalf("scan error [/foo/bar] [2] %v", e.Index)
+	if err != nil || e[0].Index != 2 {
+		t.Fatalf("scan error [/foo/bar] [2] %v", e[0].Index)
 	}
 	}
 
 
 	e, err = eh.scan("/foo/bar", 3)
 	e, err = eh.scan("/foo/bar", 3)
 
 
-	if err != nil || e.Index != 4 {
-		t.Fatalf("scan error [/foo/bar/bar] [4] %v", e.Index)
+	if err != nil || e[0].Index != 4 {
+		t.Fatalf("scan error [/foo/bar/bar] [4] %v", e[0].Index)
 	}
 	}
 
 
 	e, err = eh.scan("/foo/bar", 6)
 	e, err = eh.scan("/foo/bar", 6)

+ 3 - 2
store/store.go

@@ -435,10 +435,12 @@ func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node,
 }
 }
 
 
 // deleteExpiredKyes will delete all
 // deleteExpiredKyes will delete all
-func (s *store) deleteExpiredKeys(cutoff time.Time) {
+func (s *store) deleteExpiredKeys(cutoff time.Time, index uint64, term uint64) {
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
 
 
+	s.Index, s.Term = index, term
+
 	for {
 	for {
 		node := s.ttlKeyHeap.top()
 		node := s.ttlKeyHeap.top()
 		if node == nil || node.ExpireTime.After(cutoff) {
 		if node == nil || node.ExpireTime.After(cutoff) {
@@ -497,7 +499,6 @@ func (s *store) Save() ([]byte, error) {
 	b, err := json.Marshal(clonedStore)
 	b, err := json.Marshal(clonedStore)
 
 
 	if err != nil {
 	if err != nil {
-		fmt.Println(err)
 		return nil, err
 		return nil, err
 	}
 	}
 
 

+ 8 - 3
store/store_test.go

@@ -344,13 +344,18 @@ func TestStoreWatchExpire(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	go mockSyncService(s.deleteExpiredKeys)
 	go mockSyncService(s.deleteExpiredKeys)
 	s.Create("/foo", "bar", false, time.Now().Add(500*time.Millisecond), 2, 1)
 	s.Create("/foo", "bar", false, time.Now().Add(500*time.Millisecond), 2, 1)
-	c, _ := s.Watch("/foo", false, 0, 0, 1)
+	s.Create("/foofoo", "barbarbar", false, time.Now().Add(500*time.Millisecond), 2, 1)
+
+	c, _ := s.Watch("/", true, 0, 0, 1)
 	e := nbselect(c)
 	e := nbselect(c)
 	assert.Nil(t, e, "")
 	assert.Nil(t, e, "")
 	time.Sleep(600 * time.Millisecond)
 	time.Sleep(600 * time.Millisecond)
 	e = nbselect(c)
 	e = nbselect(c)
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Action, "expire", "")
 	assert.Equal(t, e.Key, "/foo", "")
 	assert.Equal(t, e.Key, "/foo", "")
+	e = nbselect(c)
+	assert.Equal(t, e.Action, "expire", "")
+	assert.Equal(t, e.Key, "/foofoo", "")
 }
 }
 
 
 // Ensure that the store can recover from a previously saved state.
 // Ensure that the store can recover from a previously saved state.
@@ -409,9 +414,9 @@ func nbselect(c <-chan *Event) *Event {
 	}
 	}
 }
 }
 
 
-func mockSyncService(f func(now time.Time)) {
+func mockSyncService(f func(now time.Time, index uint64, term uint64)) {
 	ticker := time.Tick(time.Millisecond * 500)
 	ticker := time.Tick(time.Millisecond * 500)
 	for now := range ticker {
 	for now := range ticker {
-		f(now)
+		f(now, 2, 1)
 	}
 	}
 }
 }

+ 0 - 1
store/watcher.go

@@ -24,7 +24,6 @@ func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {
 	// at the file we need to delete.
 	// at the file we need to delete.
 	// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
 	// For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher
 	// should get notified even if "/foo" is not the path it is watching.
 	// should get notified even if "/foo" is not the path it is watching.
-
 	if (w.recursive || originalPath || deleted) && e.Index >= w.sinceIndex {
 	if (w.recursive || originalPath || deleted) && e.Index >= w.sinceIndex {
 		w.eventChan <- e
 		w.eventChan <- e
 		return true
 		return true

+ 41 - 24
store/watcher_hub.go

@@ -19,7 +19,8 @@ type watcherHub struct {
 	watchers        map[string]*list.List
 	watchers        map[string]*list.List
 	count           int64 // current number of watchers.
 	count           int64 // current number of watchers.
 	EventHistory    *EventHistory
 	EventHistory    *EventHistory
-	pendingWatchers *list.List
+	pendingWatchers map[*list.Element]*list.List
+	pendingList     map[*list.List]string
 }
 }
 
 
 // newWatchHub creates a watchHub. The capacity determines how many events we will
 // newWatchHub creates a watchHub. The capacity determines how many events we will
@@ -30,7 +31,8 @@ func newWatchHub(capacity int) *watcherHub {
 	return &watcherHub{
 	return &watcherHub{
 		watchers:        make(map[string]*list.List),
 		watchers:        make(map[string]*list.List),
 		EventHistory:    newEventHistory(capacity),
 		EventHistory:    newEventHistory(capacity),
-		pendingWatchers: list.New(),
+		pendingWatchers: make(map[*list.Element]*list.List),
+		pendingList:     make(map[*list.List]string),
 	}
 	}
 }
 }
 
 
@@ -39,23 +41,30 @@ func newWatchHub(capacity int) *watcherHub {
 // If recursive is false, the first change after index at prefix will be sent to the event channel.
 // If recursive is false, the first change after index at prefix will be sent to the event channel.
 // If index is zero, watch will start from the current index + 1.
 // If index is zero, watch will start from the current index + 1.
 func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
 func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan *Event, *etcdErr.Error) {
-	eventChan := make(chan *Event, 1)
-
-	e, err := wh.EventHistory.scan(prefix, index)
+	events, err := wh.EventHistory.scan(prefix, index)
 
 
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	if e != nil {
-		eventChan <- e
+	eventChan := make(chan *Event, len(events)+5) // use a buffered channel
+
+	if events != nil {
+		for _, e := range events {
+			eventChan <- e
+		}
+
+		if len(events) > 1 {
+			eventChan <- nil
+		}
+
 		return eventChan, nil
 		return eventChan, nil
 	}
 	}
 
 
 	w := &watcher{
 	w := &watcher{
 		eventChan:  eventChan,
 		eventChan:  eventChan,
 		recursive:  recursive,
 		recursive:  recursive,
-		sinceIndex: index - 1, // to catch Expire()
+		sinceIndex: index,
 	}
 	}
 
 
 	l, ok := wh.watchers[prefix]
 	l, ok := wh.watchers[prefix]
@@ -95,19 +104,16 @@ func (wh *watcherHub) notify(e *Event) {
 
 
 func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
 func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
 	l, ok := wh.watchers[path]
 	l, ok := wh.watchers[path]
-
 	if ok {
 	if ok {
 		curr := l.Front()
 		curr := l.Front()
-		notifiedAll := true
 
 
 		for {
 		for {
 			if curr == nil { // we have reached the end of the list
 			if curr == nil { // we have reached the end of the list
-				if notifiedAll {
+				if l.Len() == 0 {
 					// if we have notified all watcher in the list
 					// if we have notified all watcher in the list
 					// we can delete the list
 					// we can delete the list
 					delete(wh.watchers, path)
 					delete(wh.watchers, path)
 				}
 				}
-
 				break
 				break
 			}
 			}
 
 
@@ -116,20 +122,18 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
 			w, _ := curr.Value.(*watcher)
 			w, _ := curr.Value.(*watcher)
 
 
 			if w.notify(e, e.Key == path, deleted) {
 			if w.notify(e, e.Key == path, deleted) {
-				// if we successfully notify a watcher
-				// we need to remove the watcher from the list
-				// and decrease the counter
-				l.Remove(curr)
-				atomic.AddInt64(&wh.count, -1)
 
 
 				if e.Action == Expire {
 				if e.Action == Expire {
-					wh.pendingWatchers.PushBack(w)
+					wh.pendingWatchers[curr] = l
+					wh.pendingList[l] = path
+				} else {
+					// if we successfully notify a watcher
+					// we need to remove the watcher from the list
+					// and decrease the counter
+					l.Remove(curr)
+					atomic.AddInt64(&wh.count, -1)
 				}
 				}
 
 
-			} else {
-				// once there is a watcher in the list is not interested
-				// in the event, we should keep the list in the map
-				notifiedAll = false
 			}
 			}
 
 
 			curr = next // update current to the next
 			curr = next // update current to the next
@@ -138,11 +142,24 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
 }
 }
 
 
 func (wh *watcherHub) clearPendingWatchers() {
 func (wh *watcherHub) clearPendingWatchers() {
-	for e := wh.pendingWatchers.Front(); e != nil; e = e.Next() {
+	if len(wh.pendingWatchers) == 0 { // avoid making new maps
+		return
+	}
+
+	for e, l := range wh.pendingWatchers {
+		l.Remove(e)
+
+		if l.Len() == 0 {
+			path := wh.pendingList[l]
+			delete(wh.watchers, path)
+		}
+
 		w, _ := e.Value.(*watcher)
 		w, _ := e.Value.(*watcher)
 		w.eventChan <- nil
 		w.eventChan <- nil
 	}
 	}
-	wh.pendingWatchers = list.New()
+
+	wh.pendingWatchers = make(map[*list.Element]*list.List)
+	wh.pendingList = make(map[*list.List]string)
 }
 }
 
 
 // clone function clones the watcherHub and return the cloned one.
 // clone function clones the watcherHub and return the cloned one.