|
|
@@ -140,19 +140,18 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
|
|
|
eh.rwl.RLock()
|
|
|
defer eh.rwl.RUnlock()
|
|
|
|
|
|
- start := index - eh.StartIndex + eh.DupCnt
|
|
|
+ start := index - eh.StartIndex
|
|
|
|
|
|
// the index should locate after the event history's StartIndex
|
|
|
- // and before its size
|
|
|
-
|
|
|
if start < 0 {
|
|
|
- // TODO: Add error type
|
|
|
return nil,
|
|
|
etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
|
|
|
- fmt.Sprintf("prefix:%v index:%v", prefix, index))
|
|
|
+ fmt.Sprintf("the requested history has been cleared [%v/%v]",
|
|
|
+ eh.StartIndex, index))
|
|
|
}
|
|
|
|
|
|
- if start >= uint64(eh.Queue.Size) {
|
|
|
+ // the index should locate before the size of the queue minus the duplicate count
|
|
|
+ if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index
|
|
|
return nil, nil
|
|
|
}
|
|
|
|
|
|
@@ -160,14 +159,13 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
|
|
|
|
|
|
for {
|
|
|
e := eh.Queue.Events[i]
|
|
|
- if strings.HasPrefix(e.Key, prefix) {
|
|
|
+ if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
|
|
|
return e, nil
|
|
|
}
|
|
|
|
|
|
i = (i + 1) % eh.Queue.Capacity
|
|
|
|
|
|
- if i == eh.Queue.back() {
|
|
|
- // TODO: Add error type
|
|
|
+ if i == eh.Queue.back() { // find nothing, return and watch from current index
|
|
|
return nil, nil
|
|
|
}
|
|
|
}
|