Browse Source

Merge pull request #413 from philips/event_history

fix(event_history): fix a bug in event queue
Xiang Li 12 years ago
parent
commit
d4553714d9
4 changed files with 35 additions and 18 deletions
  1. 1 1
      server/v2/get_handler.go
  2. 10 8
      store/event_history.go
  3. 4 9
      store/event_queue.go
  4. 20 0
      store/event_test.go

+ 1 - 1
server/v2/get_handler.go

@@ -57,7 +57,7 @@ func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 		// Start the watcher on the store.
 		eventChan, err := s.Store().Watch(key, recursive, sinceIndex)
 		if err != nil {
-			return etcdErr.NewError(500, key, s.Store().Index())
+			return err
 		}
 
 		cn, _ := w.(http.CloseNotifier)

+ 10 - 8
store/event_history.go

@@ -39,26 +39,27 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
 	return e
 }
 
-// scan function is enumerating events from the index in history and
-// stops till the first point where the key has identified key
+// scan enumerates events from the index history and stops at the first point
+// where the key matches.
 func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) {
 	eh.rwl.RLock()
 	defer eh.rwl.RUnlock()
 
-	// the index should locate after the event history's StartIndex
-	if index-eh.StartIndex < 0 {
+	// index should be after the event history's StartIndex
+	if index < eh.StartIndex {
 		return nil,
 			etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
 				fmt.Sprintf("the requested history has been cleared [%v/%v]",
 					eh.StartIndex, index), 0)
 	}
 
-	// the index should locate before the size of the queue minus the duplicate count
+	// the index should come before the size of the queue minus the duplicate count
 	if index > eh.LastIndex { // future index
 		return nil, nil
 	}
 
-	i := eh.Queue.Front
+	offset := index - eh.StartIndex
+	i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity
 
 	for {
 		e := eh.Queue.Events[i]
@@ -75,13 +76,13 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event,
 			ok = ok || strings.HasPrefix(e.Node.Key, key)
 		}
 
-		if ok && index <= e.Index() { // make sure we bypass the smaller one
+		if ok {
 			return e, nil
 		}
 
 		i = (i + 1) % eh.Queue.Capacity
 
-		if i > eh.Queue.back() {
+		if i == eh.Queue.Back {
 			return nil, nil
 		}
 	}
@@ -95,6 +96,7 @@ func (eh *EventHistory) clone() *EventHistory {
 		Events:   make([]*Event, eh.Queue.Capacity),
 		Size:     eh.Queue.Size,
 		Front:    eh.Queue.Front,
+		Back:     eh.Queue.Back,
 	}
 
 	for i, e := range eh.Queue.Events {

+ 4 - 9
store/event_queue.go

@@ -4,22 +4,17 @@ type eventQueue struct {
 	Events   []*Event
 	Size     int
 	Front    int
+	Back     int
 	Capacity int
 }
 
-func (eq *eventQueue) back() int {
-	return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
-}
-
 func (eq *eventQueue) insert(e *Event) {
-	index := (eq.back() + 1) % eq.Capacity
-
-	eq.Events[index] = e
+	eq.Events[eq.Back] = e
+	eq.Back = (eq.Back + 1) % eq.Capacity
 
 	if eq.Size == eq.Capacity { //dequeue
-		eq.Front = (index + 1) % eq.Capacity
+		eq.Front = (eq.Front + 1) % eq.Capacity
 	} else {
 		eq.Size++
 	}
-
 }

+ 20 - 0
store/event_test.go

@@ -64,3 +64,23 @@ func TestScanHistory(t *testing.T) {
 		t.Fatalf("bad index shoud reuturn nil")
 	}
 }
+
+// TestFullEventQueue tests a queue with capacity = 10
+// Add 1000 events into that queue, and test if scanning
+// works still for previous events.
+func TestFullEventQueue(t *testing.T) {
+
+	eh := newEventHistory(10)
+
+	// Add
+	for i := 0; i < 1000; i++ {
+		e := newEvent(Create, "/foo", uint64(i), uint64(i))
+		eh.addEvent(e)
+		e, err := eh.scan("/foo", true, uint64(i-1))
+		if i > 0 {
+			if e == nil || err != nil {
+				t.Fatalf("scan error [/foo] [%v] %v", i-1, i)
+			}
+		}
+	}
+}