event.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package fileSystem
  2. import (
  3. "strings"
  4. "sync"
  5. "time"
  6. )
  7. const (
  8. Get = "get"
  9. Set = "set"
  10. Delete = "delete"
  11. TestAndSet = "testAndSet"
  12. TestIAndSet = "testiAndSet"
  13. )
  14. type Event struct {
  15. Action string `json:"action"`
  16. Key string `json:"key, omitempty"`
  17. Dir bool `json:"dir,omitempty"`
  18. PrevValue string `json:"prevValue,omitempty"`
  19. Value string `json:"value,omitempty"`
  20. KVPairs []KeyValuePair `json:"kvs,omitempty"`
  21. Expiration *time.Time `json:"expiration,omitempty"`
  22. TTL int64 `json:"ttl,omitempty"` // Time to live in second
  23. // The command index of the raft machine when the command is executed
  24. Index uint64 `json:"index"`
  25. Term uint64 `json:"term"`
  26. }
  27. // When user list a directory, we add all the node into key-value pair slice
  28. type KeyValuePair struct {
  29. Key string `json:"key, omitempty"`
  30. Value string `json:"value,omitempty"`
  31. Dir bool `json:"dir,omitempty"`
  32. KVPairs []KeyValuePair `json:"kvs,omitempty"`
  33. }
  34. func newEvent(action string, key string, index uint64, term uint64) *Event {
  35. return &Event{
  36. Action: action,
  37. Key: key,
  38. Index: index,
  39. Term: term,
  40. }
  41. }
  42. type eventQueue struct {
  43. events []*Event
  44. size int
  45. front int
  46. back int
  47. capacity int
  48. }
  49. func (eq *eventQueue) insert(e *Event) bool {
  50. eq.back = (eq.back + 1) % eq.capacity
  51. eq.events[eq.back] = e
  52. if eq.size == eq.capacity { //dequeue
  53. eq.front = (eq.back + 1) % eq.capacity
  54. return true
  55. } else {
  56. eq.size++
  57. return false
  58. }
  59. }
  60. type EventHistory struct {
  61. Queue eventQueue
  62. StartIndex uint64
  63. rwl sync.RWMutex
  64. }
  65. func newEventHistory(capacity int) *EventHistory {
  66. return &EventHistory{
  67. Queue: eventQueue{
  68. capacity: capacity,
  69. events: make([]*Event, capacity),
  70. back: -1,
  71. },
  72. }
  73. }
  74. // addEvent function adds event into the eventHistory
  75. func (eh *EventHistory) addEvent(e *Event) {
  76. eh.rwl.Lock()
  77. defer eh.rwl.Unlock()
  78. if eh.Queue.insert(e) {
  79. eh.StartIndex++
  80. } else {
  81. eh.StartIndex = eh.Queue.events[eh.Queue.front].Index
  82. }
  83. }
  84. func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
  85. eh.rwl.RLock()
  86. defer eh.rwl.RUnlock()
  87. start := index - eh.StartIndex
  88. if start < 0 {
  89. // TODO: Add error type
  90. return nil, nil
  91. }
  92. if start >= uint64(eh.Queue.size) {
  93. return nil, nil
  94. }
  95. i := int((start + uint64(eh.Queue.front)) % uint64(eh.Queue.capacity))
  96. for {
  97. e := eh.Queue.events[i]
  98. if strings.HasPrefix(e.Key, prefix) {
  99. return e, nil
  100. }
  101. i = (i + 1) % eh.Queue.capacity
  102. if i == eh.Queue.back {
  103. // TODO: Add error type
  104. return nil, nil
  105. }
  106. }
  107. }