event_history.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "fmt"
  16. "path"
  17. "strings"
  18. "sync"
  19. etcdErr "github.com/coreos/etcd/error"
  20. )
  21. type EventHistory struct {
  22. Queue eventQueue
  23. StartIndex uint64
  24. LastIndex uint64
  25. rwl sync.RWMutex
  26. }
  27. func newEventHistory(capacity int) *EventHistory {
  28. return &EventHistory{
  29. Queue: eventQueue{
  30. Capacity: capacity,
  31. Events: make([]*Event, capacity),
  32. },
  33. }
  34. }
  35. // addEvent function adds event into the eventHistory
  36. func (eh *EventHistory) addEvent(e *Event) *Event {
  37. eh.rwl.Lock()
  38. defer eh.rwl.Unlock()
  39. eh.Queue.insert(e)
  40. eh.LastIndex = e.Index()
  41. eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index()
  42. return e
  43. }
  44. // scan enumerates events from the index history and stops at the first point
  45. // where the key matches.
  46. func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *etcdErr.Error) {
  47. eh.rwl.RLock()
  48. defer eh.rwl.RUnlock()
  49. // index should be after the event history's StartIndex
  50. if index < eh.StartIndex {
  51. return nil,
  52. etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
  53. fmt.Sprintf("the requested history has been cleared [%v/%v]",
  54. eh.StartIndex, index), 0)
  55. }
  56. // the index should come before the size of the queue minus the duplicate count
  57. if index > eh.LastIndex { // future index
  58. return nil, nil
  59. }
  60. offset := index - eh.StartIndex
  61. i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity
  62. for {
  63. e := eh.Queue.Events[i]
  64. ok := (e.Node.Key == key)
  65. if recursive {
  66. // add tailing slash
  67. key := path.Clean(key)
  68. if key[len(key)-1] != '/' {
  69. key = key + "/"
  70. }
  71. ok = ok || strings.HasPrefix(e.Node.Key, key)
  72. }
  73. if ok {
  74. return e, nil
  75. }
  76. i = (i + 1) % eh.Queue.Capacity
  77. if i == eh.Queue.Back {
  78. return nil, nil
  79. }
  80. }
  81. }
  82. // clone will be protected by a stop-world lock
  83. // do not need to obtain internal lock
  84. func (eh *EventHistory) clone() *EventHistory {
  85. clonedQueue := eventQueue{
  86. Capacity: eh.Queue.Capacity,
  87. Events: make([]*Event, eh.Queue.Capacity),
  88. Size: eh.Queue.Size,
  89. Front: eh.Queue.Front,
  90. Back: eh.Queue.Back,
  91. }
  92. for i, e := range eh.Queue.Events {
  93. clonedQueue.Events[i] = e
  94. }
  95. return &EventHistory{
  96. StartIndex: eh.StartIndex,
  97. Queue: clonedQueue,
  98. LastIndex: eh.LastIndex,
  99. }
  100. }