event.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package fileSystem
  2. import (
  3. "strings"
  4. "sync"
  5. "time"
  6. )
  7. const (
  8. Get = "get"
  9. Create = "create"
  10. Update = "update"
  11. Delete = "delete"
  12. TestAndSet = "testAndSet"
  13. )
  14. type Event struct {
  15. Action string `json:"action"`
  16. Key string `json:"key, omitempty"`
  17. Dir bool `json:"dir,omitempty"`
  18. PrevValue string `json:"prevValue,omitempty"`
  19. Value string `json:"value,omitempty"`
  20. KVPairs []KeyValuePair `json:"kvs,omitempty"`
  21. Expiration *time.Time `json:"expiration,omitempty"`
  22. TTL int64 `json:"ttl,omitempty"` // Time to live in second
  23. // The command index of the raft machine when the command is executed
  24. Index uint64 `json:"index"`
  25. Term uint64 `json:"term"`
  26. }
  27. // When user list a directory, we add all the node into key-value pair slice
  28. type KeyValuePair struct {
  29. Key string `json:"key, omitempty"`
  30. Value string `json:"value,omitempty"`
  31. Dir bool `json:"dir,omitempty"`
  32. KVPairs []KeyValuePair `json:"kvs,omitempty"`
  33. }
  34. // interfaces for sort
  35. func (e Event) Len() int {
  36. return len(e.KVPairs)
  37. }
  38. func (e Event) Less(i, j int) bool {
  39. return e.KVPairs[i].Key < e.KVPairs[j].Key
  40. }
  41. func (e Event) Swap(i, j int) {
  42. e.KVPairs[i], e.KVPairs[j] = e.KVPairs[j], e.KVPairs[i]
  43. }
  44. func newEvent(action string, key string, index uint64, term uint64) *Event {
  45. return &Event{
  46. Action: action,
  47. Key: key,
  48. Index: index,
  49. Term: term,
  50. }
  51. }
  52. type eventQueue struct {
  53. events []*Event
  54. size int
  55. front int
  56. capacity int
  57. }
  58. func (eq *eventQueue) back() int {
  59. return (eq.front + eq.size - 1 + eq.capacity) % eq.capacity
  60. }
  61. func (eq *eventQueue) insert(e *Event) {
  62. index := (eq.back() + 1) % eq.capacity
  63. eq.events[index] = e
  64. if eq.size == eq.capacity { //dequeue
  65. eq.front = (index + 1) % eq.capacity
  66. } else {
  67. eq.size++
  68. }
  69. }
  70. type EventHistory struct {
  71. Queue eventQueue
  72. StartIndex uint64
  73. rwl sync.RWMutex
  74. }
  75. func newEventHistory(capacity int) *EventHistory {
  76. return &EventHistory{
  77. Queue: eventQueue{
  78. capacity: capacity,
  79. events: make([]*Event, capacity),
  80. },
  81. }
  82. }
  83. // addEvent function adds event into the eventHistory
  84. func (eh *EventHistory) addEvent(e *Event) {
  85. eh.rwl.Lock()
  86. defer eh.rwl.Unlock()
  87. eh.Queue.insert(e)
  88. eh.StartIndex = eh.Queue.events[eh.Queue.front].Index
  89. }
  90. // scan function is enumerating events from the index in history and
  91. // stops till the first point where the key has identified prefix
  92. func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
  93. eh.rwl.RLock()
  94. defer eh.rwl.RUnlock()
  95. start := index - eh.StartIndex
  96. // the index should locate after the event history's StartIndex
  97. // and before its size
  98. if start < 0 {
  99. // TODO: Add error type
  100. return nil, nil
  101. }
  102. if start >= uint64(eh.Queue.size) {
  103. return nil, nil
  104. }
  105. i := int((start + uint64(eh.Queue.front)) % uint64(eh.Queue.capacity))
  106. for {
  107. e := eh.Queue.events[i]
  108. if strings.HasPrefix(e.Key, prefix) {
  109. return e, nil
  110. }
  111. i = (i + 1) % eh.Queue.capacity
  112. if i == eh.Queue.back() {
  113. // TODO: Add error type
  114. return nil, nil
  115. }
  116. }
  117. }