Browse Source

recursive watch

Xiang Li 12 years ago
parent
commit
ea4ab2a429
2 changed files with 75 additions and 12 deletions
  1. 50 11
      file_system/watcher.go
  2. 25 1
      file_system/watcher_test.go

+ 50 - 11
file_system/watcher.go

@@ -12,6 +12,11 @@ type watcherHub struct {
 	EventHistory *EventHistory
 }
 
+type watcher struct {
+	eventChan chan *Event
+	recursive bool
+}
+
 func newWatchHub(capacity int) *watcherHub {
 	return &watcherHub{
 		watchers:     make(map[string]*list.List),
@@ -19,7 +24,11 @@ func newWatchHub(capacity int) *watcherHub {
 	}
 }
 
-func (wh *watcherHub) watch(prefix string, index uint64) (error, <-chan *Event) {
+// watch function returns an Event channel.
+// If recursive is true, the first change after index under prefix will be sent to the event channel.
+// If recursive is false, the first change after index at prefix will be sent to the event channel.
+// If index is zero, watch will start from the current index + 1.
+func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (error, <-chan *Event) {
 	eventChan := make(chan *Event, 1)
 
 	e, err := wh.EventHistory.scan(prefix, index)
@@ -33,13 +42,19 @@ func (wh *watcherHub) watch(prefix string, index uint64) (error, <-chan *Event)
 		return nil, eventChan
 	}
 
+	w := &watcher{
+		eventChan: eventChan,
+		recursive: recursive,
+	}
+
 	l, ok := wh.watchers[prefix]
 
-	if ok {
-		l.PushBack(eventChan)
-	} else {
+	if ok { // add the new watcher to the back of the list
+		l.PushBack(w)
+
+	} else { // create a new list and add the new watcher
 		l := list.New()
-		l.PushBack(eventChan)
+		l.PushBack(w)
 		wh.watchers[prefix] = l
 	}
 
@@ -59,15 +74,39 @@ func (wh *watcherHub) notify(e *Event) {
 
 		if ok {
 
+			curr := l.Front()
+			notifiedAll := true
+
 			for {
-				element := l.Front()
-				if element == nil {
-					delete(wh.watchers, currPath)
+
+				if curr == nil { // we have reached the end of the list
+
+					if notifiedAll {
+						// if we have notified all watcher in the list
+						// we can delete the list
+						delete(wh.watchers, currPath)
+					}
 					break
 				}
-				c, _ := element.Value.(chan *Event)
-				c <- e
-				l.Remove(element)
+
+				next := curr.Next() // save the next
+
+				w, _ := curr.Value.(*watcher)
+
+				if w.recursive {
+					w.eventChan <- e
+					l.Remove(curr)
+				} else {
+					if e.Key == currPath { // only notify the same path
+						w.eventChan <- e
+						l.Remove(curr)
+					} else { // we do not notify all watcher in the list
+						notifiedAll = false
+					}
+				}
+
+				curr = next // go to the next one
+
 			}
 		}
 

+ 25 - 1
file_system/watcher_test.go

@@ -6,7 +6,7 @@ import (
 
 func TestWatch(t *testing.T) {
 	wh := newWatchHub(100)
-	err, c := wh.watch("/foo", 0)
+	err, c := wh.watch("/foo", true, 0)
 
 	if err != nil {
 		t.Fatal("%v", err)
@@ -28,4 +28,28 @@ func TestWatch(t *testing.T) {
 	if e != re {
 		t.Fatal("recv != send")
 	}
+
+	_, c = wh.watch("/foo", false, 0)
+
+	e = newEvent(Set, "/foo/bar", 1, 0)
+
+	wh.notify(e)
+
+	select {
+	case <-c:
+		t.Fatal("should not receive from channel if not recursive")
+	default:
+		// do nothing
+	}
+
+	e = newEvent(Set, "/foo", 1, 0)
+
+	wh.notify(e)
+
+	re = <-c
+
+	if e != re {
+		t.Fatal("recv != send")
+	}
+
 }