event_history.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package store
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. etcdErr "github.com/coreos/etcd/error"
  7. )
  8. type EventHistory struct {
  9. Queue eventQueue
  10. StartIndex uint64
  11. LastIndex uint64
  12. LastTerm uint64
  13. DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue
  14. rwl sync.RWMutex
  15. }
  16. func newEventHistory(capacity int) *EventHistory {
  17. return &EventHistory{
  18. Queue: eventQueue{
  19. Capacity: capacity,
  20. Events: make([]*Event, capacity),
  21. },
  22. }
  23. }
  24. // addEvent function adds event into the eventHistory
  25. func (eh *EventHistory) addEvent(e *Event) *Event {
  26. eh.rwl.Lock()
  27. defer eh.rwl.Unlock()
  28. var duped uint64
  29. if e.Index == UndefIndex {
  30. e.Index = eh.LastIndex
  31. e.Term = eh.LastTerm
  32. duped = 1
  33. }
  34. eh.Queue.insert(e)
  35. eh.LastIndex = e.Index
  36. eh.LastTerm = e.Term
  37. eh.DupCnt += duped
  38. eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
  39. return e
  40. }
  41. // scan function is enumerating events from the index in history and
  42. // stops till the first point where the key has identified prefix
  43. func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) {
  44. eh.rwl.RLock()
  45. defer eh.rwl.RUnlock()
  46. start := index - eh.StartIndex
  47. // the index should locate after the event history's StartIndex
  48. if start < 0 {
  49. return nil,
  50. etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
  51. fmt.Sprintf("the requested history has been cleared [%v/%v]",
  52. eh.StartIndex, index), UndefIndex, UndefTerm)
  53. }
  54. // the index should locate before the size of the queue minus the duplicate count
  55. if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index
  56. return nil, nil
  57. }
  58. i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
  59. for {
  60. e := eh.Queue.Events[i]
  61. if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
  62. return e, nil
  63. }
  64. i = (i + 1) % eh.Queue.Capacity
  65. if i == eh.Queue.back() { // find nothing, return and watch from current index
  66. return nil, nil
  67. }
  68. }
  69. }
  70. // clone will be protected by a stop-world lock
  71. // do not need to obtain internal lock
  72. func (eh *EventHistory) clone() *EventHistory {
  73. clonedQueue := eventQueue{
  74. Capacity: eh.Queue.Capacity,
  75. Events: make([]*Event, eh.Queue.Capacity),
  76. Size: eh.Queue.Size,
  77. Front: eh.Queue.Front,
  78. }
  79. for i, e := range eh.Queue.Events {
  80. clonedQueue.Events[i] = e
  81. }
  82. return &EventHistory{
  83. StartIndex: eh.StartIndex,
  84. Queue: clonedQueue,
  85. LastIndex: eh.LastIndex,
  86. LastTerm: eh.LastTerm,
  87. DupCnt: eh.DupCnt,
  88. }
  89. }