event.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package fileSystem
  2. import (
  3. "strings"
  4. "sync"
  5. "time"
  6. )
  7. const (
  8. Get = "get"
  9. Create = "create"
  10. Update = "update"
  11. Delete = "delete"
  12. TestAndSet = "testAndSet"
  13. )
  14. type Event struct {
  15. Action string `json:"action"`
  16. Key string `json:"key, omitempty"`
  17. Dir bool `json:"dir,omitempty"`
  18. PrevValue string `json:"prevValue,omitempty"`
  19. Value string `json:"value,omitempty"`
  20. KVPairs []KeyValuePair `json:"kvs,omitempty"`
  21. Expiration *time.Time `json:"expiration,omitempty"`
  22. TTL int64 `json:"ttl,omitempty"` // Time to live in second
  23. // The command index of the raft machine when the command is executed
  24. Index uint64 `json:"index"`
  25. Term uint64 `json:"term"`
  26. }
  27. // When user list a directory, we add all the node into key-value pair slice
  28. type KeyValuePair struct {
  29. Key string `json:"key, omitempty"`
  30. Value string `json:"value,omitempty"`
  31. Dir bool `json:"dir,omitempty"`
  32. KVPairs []KeyValuePair `json:"kvs,omitempty"`
  33. }
  34. func newEvent(action string, key string, index uint64, term uint64) *Event {
  35. return &Event{
  36. Action: action,
  37. Key: key,
  38. Index: index,
  39. Term: term,
  40. }
  41. }
  42. type eventQueue struct {
  43. events []*Event
  44. size int
  45. front int
  46. capacity int
  47. }
  48. func (eq *eventQueue) back() int {
  49. return (eq.front + eq.size - 1 + eq.capacity) % eq.capacity
  50. }
  51. func (eq *eventQueue) insert(e *Event) {
  52. index := (eq.back() + 1) % eq.capacity
  53. eq.events[index] = e
  54. if eq.size == eq.capacity { //dequeue
  55. eq.front = (index + 1) % eq.capacity
  56. } else {
  57. eq.size++
  58. }
  59. }
  60. type EventHistory struct {
  61. Queue eventQueue
  62. StartIndex uint64
  63. rwl sync.RWMutex
  64. }
  65. func newEventHistory(capacity int) *EventHistory {
  66. return &EventHistory{
  67. Queue: eventQueue{
  68. capacity: capacity,
  69. events: make([]*Event, capacity),
  70. },
  71. }
  72. }
  73. // addEvent function adds event into the eventHistory
  74. func (eh *EventHistory) addEvent(e *Event) {
  75. eh.rwl.Lock()
  76. defer eh.rwl.Unlock()
  77. eh.Queue.insert(e)
  78. eh.StartIndex = eh.Queue.events[eh.Queue.front].Index
  79. }
  80. // scan function is enumerating events from the index in history and
  81. // stops till the first point where the key has identified prefix
  82. func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
  83. eh.rwl.RLock()
  84. defer eh.rwl.RUnlock()
  85. start := index - eh.StartIndex
  86. // the index should locate after the event history's StartIndex
  87. // and before its size
  88. if start < 0 {
  89. // TODO: Add error type
  90. return nil, nil
  91. }
  92. if start >= uint64(eh.Queue.size) {
  93. return nil, nil
  94. }
  95. i := int((start + uint64(eh.Queue.front)) % uint64(eh.Queue.capacity))
  96. for {
  97. e := eh.Queue.events[i]
  98. if strings.HasPrefix(e.Key, prefix) {
  99. return e, nil
  100. }
  101. i = (i + 1) % eh.Queue.capacity
  102. if i == eh.Queue.back() {
  103. // TODO: Add error type
  104. return nil, nil
  105. }
  106. }
  107. }