event_history.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package store
  2. import (
  3. "fmt"
  4. "path"
  5. "strings"
  6. "sync"
  7. etcdErr "github.com/coreos/etcd/error"
  8. )
  9. type EventHistory struct {
  10. Queue eventQueue
  11. StartIndex uint64
  12. LastIndex uint64
  13. rwl sync.RWMutex
  14. }
  15. func newEventHistory(capacity int) *EventHistory {
  16. return &EventHistory{
  17. Queue: eventQueue{
  18. Capacity: capacity,
  19. Events: make([]*Event, capacity),
  20. },
  21. }
  22. }
  23. // addEvent function adds event into the eventHistory
  24. func (eh *EventHistory) addEvent(e *Event) *Event {
  25. eh.rwl.Lock()
  26. defer eh.rwl.Unlock()
  27. eh.Queue.insert(e)
  28. eh.LastIndex = e.Index()
  29. eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index()
  30. return e
  31. }
  32. // scan enumerates events from the index history and stops at the first point
  33. // where the key matches.
  34. func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) {
  35. eh.rwl.RLock()
  36. defer eh.rwl.RUnlock()
  37. // index should be after the event history's StartIndex
  38. if index < eh.StartIndex {
  39. return nil,
  40. etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
  41. fmt.Sprintf("the requested history has been cleared [%v/%v]",
  42. eh.StartIndex, index), 0)
  43. }
  44. // the index should come before the size of the queue minus the duplicate count
  45. if index > eh.LastIndex { // future index
  46. return nil, nil
  47. }
  48. offset := index - eh.StartIndex
  49. i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity
  50. for {
  51. e := eh.Queue.Events[i]
  52. ok := (e.Node.Key == key)
  53. if recursive {
  54. // add tailing slash
  55. key := path.Clean(key)
  56. if key[len(key)-1] != '/' {
  57. key = key + "/"
  58. }
  59. ok = ok || strings.HasPrefix(e.Node.Key, key)
  60. }
  61. if ok {
  62. return e, nil
  63. }
  64. i = (i + 1) % eh.Queue.Capacity
  65. if i == eh.Queue.Back {
  66. return nil, nil
  67. }
  68. }
  69. }
  70. // clone will be protected by a stop-world lock
  71. // do not need to obtain internal lock
  72. func (eh *EventHistory) clone() *EventHistory {
  73. clonedQueue := eventQueue{
  74. Capacity: eh.Queue.Capacity,
  75. Events: make([]*Event, eh.Queue.Capacity),
  76. Size: eh.Queue.Size,
  77. Front: eh.Queue.Front,
  78. Back: eh.Queue.Back,
  79. }
  80. for i, e := range eh.Queue.Events {
  81. clonedQueue.Events[i] = e
  82. }
  83. return &EventHistory{
  84. StartIndex: eh.StartIndex,
  85. Queue: clonedQueue,
  86. LastIndex: eh.LastIndex,
  87. }
  88. }