event.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package store
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "time"
  7. etcdErr "github.com/coreos/etcd/error"
  8. )
  9. const (
  10. Get = "get"
  11. Create = "create"
  12. Update = "update"
  13. Delete = "delete"
  14. TestAndSet = "testAndSet"
  15. Expire = "expire"
  16. UndefIndex = 0
  17. UndefTerm = 0
  18. )
  19. type Event struct {
  20. Action string `json:"action"`
  21. Key string `json:"key, omitempty"`
  22. Dir bool `json:"dir,omitempty"`
  23. PrevValue string `json:"prevValue,omitempty"`
  24. Value string `json:"value,omitempty"`
  25. KVPairs []KeyValuePair `json:"kvs,omitempty"`
  26. Expiration *time.Time `json:"expiration,omitempty"`
  27. TTL int64 `json:"ttl,omitempty"` // Time to live in second
  28. // The command index of the raft machine when the command is executed
  29. Index uint64 `json:"index"`
  30. Term uint64 `json:"term"`
  31. }
  32. // When user list a directory, we add all the node into key-value pair slice
  33. type KeyValuePair struct {
  34. Key string `json:"key, omitempty"`
  35. Value string `json:"value,omitempty"`
  36. Dir bool `json:"dir,omitempty"`
  37. KVPairs []KeyValuePair `json:"kvs,omitempty"`
  38. }
  39. // interfaces for sorting
  40. func (k KeyValuePair) Len() int {
  41. return len(k.KVPairs)
  42. }
  43. func (k KeyValuePair) Less(i, j int) bool {
  44. return k.KVPairs[i].Key < k.KVPairs[j].Key
  45. }
  46. func (k KeyValuePair) Swap(i, j int) {
  47. k.KVPairs[i], k.KVPairs[j] = k.KVPairs[j], k.KVPairs[i]
  48. }
  49. func newEvent(action string, key string, index uint64, term uint64) *Event {
  50. return &Event{
  51. Action: action,
  52. Key: key,
  53. Index: index,
  54. Term: term,
  55. }
  56. }
  57. type eventQueue struct {
  58. Events []*Event
  59. Size int
  60. Front int
  61. Capacity int
  62. }
  63. func (eq *eventQueue) back() int {
  64. return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
  65. }
  66. func (eq *eventQueue) insert(e *Event) {
  67. index := (eq.back() + 1) % eq.Capacity
  68. eq.Events[index] = e
  69. if eq.Size == eq.Capacity { //dequeue
  70. eq.Front = (index + 1) % eq.Capacity
  71. } else {
  72. eq.Size++
  73. }
  74. }
  75. type EventHistory struct {
  76. Queue eventQueue
  77. StartIndex uint64
  78. LastIndex uint64
  79. LastTerm uint64
  80. DupIndex uint64
  81. rwl sync.RWMutex
  82. }
  83. func newEventHistory(capacity int) *EventHistory {
  84. return &EventHistory{
  85. Queue: eventQueue{
  86. Capacity: capacity,
  87. Events: make([]*Event, capacity),
  88. },
  89. }
  90. }
  91. // addEvent function adds event into the eventHistory
  92. func (eh *EventHistory) addEvent(e *Event) *Event {
  93. eh.rwl.Lock()
  94. defer eh.rwl.Unlock()
  95. DupIndex := uint64(0)
  96. if e.Index == UndefIndex {
  97. e.Index = eh.LastIndex
  98. DupIndex = 1
  99. }
  100. if e.Term == UndefTerm {
  101. e.Term = eh.LastTerm
  102. DupIndex = 1
  103. }
  104. eh.Queue.insert(e)
  105. eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
  106. eh.LastIndex = e.Index
  107. eh.LastTerm = e.Term
  108. eh.DupIndex += DupIndex
  109. return e
  110. }
  111. // scan function is enumerating events from the index in history and
  112. // stops till the first point where the key has identified prefix
  113. func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
  114. eh.rwl.RLock()
  115. defer eh.rwl.RUnlock()
  116. start := index - eh.StartIndex + eh.DupIndex
  117. // the index should locate after the event history's StartIndex
  118. // and before its size
  119. if start < 0 {
  120. // TODO: Add error type
  121. return nil,
  122. etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
  123. fmt.Sprintf("prefix:%v index:%v", prefix, index))
  124. }
  125. if start >= uint64(eh.Queue.Size) {
  126. return nil, nil
  127. }
  128. i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
  129. for {
  130. e := eh.Queue.Events[i]
  131. if strings.HasPrefix(e.Key, prefix) {
  132. return e, nil
  133. }
  134. i = (i + 1) % eh.Queue.Capacity
  135. if i == eh.Queue.back() {
  136. // TODO: Add error type
  137. return nil, nil
  138. }
  139. }
  140. }
  141. // clone will be protected by a stop-world lock
  142. // do not need to obtain internal lock
  143. func (eh *EventHistory) clone() *EventHistory {
  144. clonedQueue := eventQueue{
  145. Capacity: eh.Queue.Capacity,
  146. Events: make([]*Event, eh.Queue.Capacity),
  147. Size: eh.Queue.Size,
  148. Front: eh.Queue.Front,
  149. }
  150. for i, e := range eh.Queue.Events {
  151. clonedQueue.Events[i] = e
  152. }
  153. return &EventHistory{
  154. StartIndex: eh.StartIndex,
  155. Queue: clonedQueue,
  156. }
  157. }