store.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "path"
  6. "strconv"
  7. "time"
  8. )
  9. //------------------------------------------------------------------------------
  10. //
  11. // Typedefs
  12. //
  13. //------------------------------------------------------------------------------
  14. // The main struct of the Key-Value store
  15. type Store struct {
  16. // key-value store structure
  17. Tree *tree
  18. // WatcherHub is where we register all the clients
  19. // who issue a watch request
  20. watcher *WatcherHub
  21. // The string channel to send messages to the outside world
  22. // Now we use it to send changes to the hub of the web service
  23. messager *chan string
  24. // A map to keep the recent response to the clients
  25. ResponseMap map[string]Response
  26. // The max number of the recent responses we can record
  27. ResponseMaxSize int
  28. // The current number of the recent responses we have recorded
  29. ResponseCurrSize uint
  30. // The index of the first recent responses we have
  31. ResponseStartIndex uint64
  32. // Current index of the raft machine
  33. Index uint64
  34. }
  35. // A Node represents a Value in the Key-Value pair in the store
  36. // It has its value, expire time and a channel used to update the
  37. // expire time (since we do countdown in a go routine, we need to
  38. // communicate with it via channel)
  39. type Node struct {
  40. // The string value of the node
  41. Value string `json:"value"`
  42. // If the node is a permanent one the ExprieTime will be Unix(0,0)
  43. // Otherwise after the expireTime, the node will be deleted
  44. ExpireTime time.Time `json:"expireTime"`
  45. // A channel to update the expireTime of the node
  46. update chan time.Time `json:"-"`
  47. }
  48. // The response from the store to the user who issue a command
  49. type Response struct {
  50. Action string `json:"action"`
  51. Key string `json:"key"`
  52. PrevValue string `json:"prevValue"`
  53. Value string `json:"value"`
  54. // If the key existed before the action, this field should be true
  55. // If the key did not exist before the action, this field should be false
  56. Exist bool `json:"exist"`
  57. Expiration time.Time `json:"expiration"`
  58. // Time to live in second
  59. TTL int64 `json:"ttl"`
  60. // The command index of the raft machine when the command is executed
  61. Index uint64 `json:"index"`
  62. }
  63. // A listNode represent the simplest Key-Value pair with its type
  64. // It is only used when do list opeartion
  65. // We want to have a file system like store, thus we distingush "file"
  66. // and "directory"
  67. type ListNode struct {
  68. Key string
  69. Value string
  70. Type string
  71. }
  72. var PERMANENT = time.Unix(0, 0)
  73. //------------------------------------------------------------------------------
  74. //
  75. // Methods
  76. //
  77. //------------------------------------------------------------------------------
  78. // Create a new stroe
  79. // Arguement max is the max number of response we want to record
  80. func CreateStore(max int) *Store {
  81. s := new(Store)
  82. s.messager = nil
  83. s.ResponseMap = make(map[string]Response)
  84. s.ResponseStartIndex = 0
  85. s.ResponseMaxSize = max
  86. s.ResponseCurrSize = 0
  87. s.Tree = &tree{
  88. &treeNode{
  89. Node{
  90. "/",
  91. time.Unix(0, 0),
  92. nil,
  93. },
  94. true,
  95. make(map[string]*treeNode),
  96. },
  97. }
  98. s.watcher = createWatcherHub()
  99. return s
  100. }
  101. // Set the messager of the store
  102. func (s *Store) SetMessager(messager *chan string) {
  103. s.messager = messager
  104. }
  105. // Set the key to value with expiration time
  106. func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  107. //Update index
  108. s.Index = index
  109. key = path.Clean("/" + key)
  110. isExpire := !expireTime.Equal(PERMANENT)
  111. // When the slow follower receive the set command
  112. // the key may be expired, we should not add the node
  113. // also if the node exist, we need to delete the node
  114. if isExpire && expireTime.Sub(time.Now()) < 0 {
  115. return s.Delete(key, index)
  116. }
  117. var TTL int64
  118. // Update ttl
  119. if isExpire {
  120. TTL = int64(expireTime.Sub(time.Now()) / time.Second)
  121. } else {
  122. // For permanent value, we set ttl to -1
  123. TTL = -1
  124. }
  125. // Get the node
  126. node, ok := s.Tree.get(key)
  127. if ok {
  128. // Update when node exists
  129. // Node is not permanent
  130. if !node.ExpireTime.Equal(PERMANENT) {
  131. // If node is not permanent
  132. // Update its expireTime
  133. node.update <- expireTime
  134. } else {
  135. // If we want the permanent node to have expire time
  136. // We need to create create a go routine with a channel
  137. if isExpire {
  138. node.update = make(chan time.Time)
  139. go s.monitorExpiration(key, node.update, expireTime)
  140. }
  141. }
  142. // Update the information of the node
  143. s.Tree.set(key, Node{value, expireTime, node.update})
  144. resp := Response{"SET", key, node.Value, value, true, expireTime, TTL, index}
  145. s.watcher.notify(resp)
  146. msg, err := json.Marshal(resp)
  147. // Send to the messager
  148. if s.messager != nil && err == nil {
  149. *s.messager <- string(msg)
  150. }
  151. s.addToResponseMap(index, &resp)
  152. return msg, err
  153. // Add new node
  154. } else {
  155. update := make(chan time.Time)
  156. s.Tree.set(key, Node{value, expireTime, update})
  157. if isExpire {
  158. go s.monitorExpiration(key, update, expireTime)
  159. }
  160. resp := Response{"SET", key, "", value, false, expireTime, TTL, index}
  161. msg, err := json.Marshal(resp)
  162. // Nofity the watcher
  163. s.watcher.notify(resp)
  164. // Send to the messager
  165. if s.messager != nil && err == nil {
  166. *s.messager <- string(msg)
  167. }
  168. s.addToResponseMap(index, &resp)
  169. return msg, err
  170. }
  171. }
  172. // Get the value of the key
  173. func (s *Store) Get(key string) Response {
  174. key = path.Clean("/" + key)
  175. node, ok := s.Tree.get(key)
  176. if ok {
  177. var TTL int64
  178. var isExpire bool = false
  179. isExpire = !node.ExpireTime.Equal(PERMANENT)
  180. // Update ttl
  181. if isExpire {
  182. TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  183. } else {
  184. TTL = -1
  185. }
  186. return Response{"GET", key, node.Value, node.Value, true, node.ExpireTime, TTL, s.Index}
  187. } else {
  188. // we do not found the key
  189. return Response{"GET", key, "", "", false, time.Unix(0, 0), 0, s.Index}
  190. }
  191. }
  192. // List all the item in the prefix
  193. func (s *Store) List(prefix string) ([]byte, error) {
  194. nodes, keys, dirs, ok := s.Tree.list(prefix)
  195. var ln []ListNode
  196. if ok {
  197. ln = make([]ListNode, len(nodes))
  198. for i := 0; i < len(nodes); i++ {
  199. ln[i] = ListNode{keys[i], nodes[i].Value, dirs[i]}
  200. }
  201. }
  202. return json.Marshal(ln)
  203. }
  204. // Delete the key
  205. func (s *Store) Delete(key string, index uint64) ([]byte, error) {
  206. key = path.Clean("/" + key)
  207. //Update index
  208. s.Index = index
  209. node, ok := s.Tree.get(key)
  210. if ok {
  211. if node.ExpireTime.Equal(PERMANENT) {
  212. s.Tree.delete(key)
  213. } else {
  214. // Kill the expire go routine
  215. node.update <- PERMANENT
  216. s.Tree.delete(key)
  217. }
  218. resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, index}
  219. msg, err := json.Marshal(resp)
  220. s.watcher.notify(resp)
  221. // notify the messager
  222. if s.messager != nil && err == nil {
  223. *s.messager <- string(msg)
  224. }
  225. s.addToResponseMap(index, &resp)
  226. return msg, err
  227. } else {
  228. resp := Response{"DELETE", key, "", "", false, time.Unix(0, 0), 0, index}
  229. s.addToResponseMap(index, &resp)
  230. return json.Marshal(resp)
  231. }
  232. }
  233. // Set the value of the key to the value if the given prevValue is equal to the value of the key
  234. func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  235. resp := s.Get(key)
  236. if resp.PrevValue == prevValue {
  237. // If test success, do set
  238. return s.Set(key, value, expireTime, index)
  239. } else {
  240. // If fails, return the result of get which contains the current
  241. // status of the key-value pair
  242. return json.Marshal(resp)
  243. }
  244. }
  245. // Add a channel to the watchHub.
  246. // The watchHub will send response to the channel when any key under the prefix
  247. // changes [since the sinceIndex if given]
  248. func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
  249. return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, &s.ResponseMap)
  250. }
  251. // This function should be created as a go routine to delete the key-value pair
  252. // when it reaches expiration time
  253. func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
  254. duration := expireTime.Sub(time.Now())
  255. for {
  256. select {
  257. // Timeout delete the node
  258. case <-time.After(duration):
  259. node, ok := s.Tree.get(key)
  260. if !ok {
  261. return
  262. } else {
  263. s.Tree.delete(key)
  264. resp := Response{"DELETE", key, node.Value, "", true, node.ExpireTime, 0, s.Index}
  265. msg, err := json.Marshal(resp)
  266. s.watcher.notify(resp)
  267. // notify the messager
  268. if s.messager != nil && err == nil {
  269. *s.messager <- string(msg)
  270. }
  271. return
  272. }
  273. case updateTime := <-update:
  274. // Update duration
  275. // If the node become a permanent one, the go routine is
  276. // not needed
  277. if updateTime.Equal(PERMANENT) {
  278. return
  279. }
  280. // Update duration
  281. duration = updateTime.Sub(time.Now())
  282. }
  283. }
  284. }
  285. // When we receive a command that will change the state of the key-value store
  286. // We will add the result of it to the ResponseMap for the use of watch command
  287. // Also we may remove the oldest response when we add new one
  288. func (s *Store) addToResponseMap(index uint64, resp *Response) {
  289. // zero case
  290. if s.ResponseMaxSize == 0 {
  291. return
  292. }
  293. strIndex := strconv.FormatUint(index, 10)
  294. s.ResponseMap[strIndex] = *resp
  295. // unlimited
  296. if s.ResponseMaxSize < 0 {
  297. s.ResponseCurrSize++
  298. return
  299. }
  300. // if we reach the max point, we need to delete the most latest
  301. // response and update the startIndex
  302. if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
  303. s.ResponseStartIndex++
  304. delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
  305. } else {
  306. s.ResponseCurrSize++
  307. }
  308. }
  309. // Save the current state of the storage system
  310. func (s *Store) Save() ([]byte, error) {
  311. b, err := json.Marshal(s)
  312. if err != nil {
  313. fmt.Println(err)
  314. return nil, err
  315. }
  316. return b, nil
  317. }
  318. // Recovery the state of the stroage system from a previous state
  319. func (s *Store) Recovery(state []byte) error {
  320. err := json.Unmarshal(state, s)
  321. // The only thing need to change after the recovery is the
  322. // node with expiration time, we need to delete all the node
  323. // that have been expired and setup go routines to monitor the
  324. // other ones
  325. s.checkExpiration()
  326. return err
  327. }
  328. // Clean the expired nodes
  329. // Set up go routines to mon
  330. func (s *Store) checkExpiration() {
  331. s.Tree.traverse(s.checkNode, false)
  332. }
  333. // Check each node
  334. func (s *Store) checkNode(key string, node *Node) {
  335. if node.ExpireTime.Equal(PERMANENT) {
  336. return
  337. } else {
  338. if node.ExpireTime.Sub(time.Now()) >= time.Second {
  339. node.update = make(chan time.Time)
  340. go s.monitorExpiration(key, node.update, node.ExpireTime)
  341. } else {
  342. // we should delete this node
  343. s.Tree.delete(key)
  344. }
  345. }
  346. }