store.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "path"
  6. "strconv"
  7. "time"
  8. )
  9. //------------------------------------------------------------------------------
  10. //
  11. // Typedefs
  12. //
  13. //------------------------------------------------------------------------------
  14. // The main struct of the Key-Value store
  15. type Store struct {
  16. // key-value store structure
  17. Tree *tree
  18. // WatcherHub is where we register all the clients
  19. // who issue a watch request
  20. watcher *WatcherHub
  21. // The string channel to send messages to the outside world
  22. // Now we use it to send changes to the hub of the web service
  23. messager *chan string
  24. // A map to keep the recent response to the clients
  25. ResponseMap map[string]Response
  26. // The max number of the recent responses we can record
  27. ResponseMaxSize int
  28. // The current number of the recent responses we have recorded
  29. ResponseCurrSize uint
  30. // The index of the first recent responses we have
  31. ResponseStartIndex uint64
  32. // Current index of the raft machine
  33. Index uint64
  34. }
  35. // A Node represents a Value in the Key-Value pair in the store
  36. // It has its value, expire time and a channel used to update the
  37. // expire time (since we do countdown in a go routine, we need to
  38. // communicate with it via channel)
  39. type Node struct {
  40. // The string value of the node
  41. Value string `json:"value"`
  42. // If the node is a permanent one the ExprieTime will be Unix(0,0)
  43. // Otherwise after the expireTime, the node will be deleted
  44. ExpireTime time.Time `json:"expireTime"`
  45. // A channel to update the expireTime of the node
  46. update chan time.Time `json:"-"`
  47. }
  48. // The response from the store to the user who issue a command
  49. type Response struct {
  50. Action string `json:"action"`
  51. Key string `json:"key"`
  52. PrevValue string `json:"prevValue,omitempty"`
  53. Value string `json:"value,omitempty"`
  54. // If the key existed before the action, this field should be true
  55. // If the key did not exist before the action, this field should be false
  56. NewKey bool `json:"newKey,omitempty"`
  57. Expiration *time.Time `json:"expiration,omitempty"`
  58. // Time to live in second
  59. TTL int64 `json:"ttl,omitempty"`
  60. // The command index of the raft machine when the command is executed
  61. Index uint64 `json:"index"`
  62. }
  63. // A listNode represent the simplest Key-Value pair with its type
  64. // It is only used when do list opeartion
  65. // We want to have a file system like store, thus we distingush "file"
  66. // and "directory"
  67. type ListNode struct {
  68. Key string
  69. Value string
  70. Type string
  71. }
  72. var PERMANENT = time.Unix(0, 0)
  73. //------------------------------------------------------------------------------
  74. //
  75. // Methods
  76. //
  77. //------------------------------------------------------------------------------
  78. // Create a new stroe
  79. // Arguement max is the max number of response we want to record
  80. func CreateStore(max int) *Store {
  81. s := new(Store)
  82. s.messager = nil
  83. s.ResponseMap = make(map[string]Response)
  84. s.ResponseStartIndex = 0
  85. s.ResponseMaxSize = max
  86. s.ResponseCurrSize = 0
  87. s.Tree = &tree{
  88. &treeNode{
  89. Node{
  90. "/",
  91. time.Unix(0, 0),
  92. nil,
  93. },
  94. true,
  95. make(map[string]*treeNode),
  96. },
  97. }
  98. s.watcher = createWatcherHub()
  99. return s
  100. }
  101. // Set the messager of the store
  102. func (s *Store) SetMessager(messager *chan string) {
  103. s.messager = messager
  104. }
  105. // Set the key to value with expiration time
  106. func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  107. //Update index
  108. s.Index = index
  109. key = path.Clean("/" + key)
  110. isExpire := !expireTime.Equal(PERMANENT)
  111. // base response
  112. resp := Response{
  113. Action: "SET",
  114. Key: key,
  115. Value: value,
  116. Index: index,
  117. }
  118. // When the slow follower receive the set command
  119. // the key may be expired, we should not add the node
  120. // also if the node exist, we need to delete the node
  121. if isExpire && expireTime.Sub(time.Now()) < 0 {
  122. return s.Delete(key, index)
  123. }
  124. var TTL int64
  125. // Update ttl
  126. if isExpire {
  127. TTL = int64(expireTime.Sub(time.Now()) / time.Second)
  128. resp.Expiration = &expireTime
  129. resp.TTL = TTL
  130. }
  131. // Get the node
  132. node, ok := s.Tree.get(key)
  133. if ok {
  134. // Update when node exists
  135. // Node is not permanent
  136. if !node.ExpireTime.Equal(PERMANENT) {
  137. // If node is not permanent
  138. // Update its expireTime
  139. node.update <- expireTime
  140. } else {
  141. // If we want the permanent node to have expire time
  142. // We need to create create a go routine with a channel
  143. if isExpire {
  144. node.update = make(chan time.Time)
  145. go s.monitorExpiration(key, node.update, expireTime)
  146. }
  147. }
  148. // Update the information of the node
  149. s.Tree.set(key, Node{value, expireTime, node.update})
  150. resp.PrevValue = node.Value
  151. s.watcher.notify(resp)
  152. msg, err := json.Marshal(resp)
  153. // Send to the messager
  154. if s.messager != nil && err == nil {
  155. *s.messager <- string(msg)
  156. }
  157. s.addToResponseMap(index, &resp)
  158. return msg, err
  159. // Add new node
  160. } else {
  161. update := make(chan time.Time)
  162. ok := s.Tree.set(key, Node{value, expireTime, update})
  163. if !ok {
  164. err := NotFile(key)
  165. return nil, err
  166. }
  167. if isExpire {
  168. go s.monitorExpiration(key, update, expireTime)
  169. }
  170. resp.NewKey = true
  171. msg, err := json.Marshal(resp)
  172. // Nofity the watcher
  173. s.watcher.notify(resp)
  174. // Send to the messager
  175. if s.messager != nil && err == nil {
  176. *s.messager <- string(msg)
  177. }
  178. s.addToResponseMap(index, &resp)
  179. return msg, err
  180. }
  181. }
  182. // Get the value of the key
  183. func (s *Store) Get(key string) ([]byte, error) {
  184. resp := s.internalGet(key)
  185. if resp != nil {
  186. return json.Marshal(resp)
  187. } else {
  188. err := NotFoundError(key)
  189. return nil, err
  190. }
  191. }
  192. // Get the value of the key and return the raw response
  193. func (s *Store) internalGet(key string) *Response {
  194. key = path.Clean("/" + key)
  195. node, ok := s.Tree.get(key)
  196. if ok {
  197. var TTL int64
  198. var isExpire bool = false
  199. isExpire = !node.ExpireTime.Equal(PERMANENT)
  200. resp := &Response{
  201. Action: "GET",
  202. Key: key,
  203. Value: node.Value,
  204. Index: s.Index,
  205. }
  206. // Update ttl
  207. if isExpire {
  208. TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  209. resp.Expiration = &node.ExpireTime
  210. resp.TTL = TTL
  211. }
  212. return resp
  213. } else {
  214. // we do not found the key
  215. return nil
  216. }
  217. }
  218. // List all the item in the prefix
  219. func (s *Store) List(prefix string) ([]byte, error) {
  220. nodes, keys, dirs, ok := s.Tree.list(prefix)
  221. var ln []ListNode
  222. if ok {
  223. ln = make([]ListNode, len(nodes))
  224. for i := 0; i < len(nodes); i++ {
  225. ln[i] = ListNode{keys[i], nodes[i].Value, dirs[i]}
  226. }
  227. }
  228. err := NotFoundError(prefix)
  229. return nil, err
  230. }
  231. // Delete the key
  232. func (s *Store) Delete(key string, index uint64) ([]byte, error) {
  233. key = path.Clean("/" + key)
  234. //Update index
  235. s.Index = index
  236. node, ok := s.Tree.get(key)
  237. if ok {
  238. resp := Response{
  239. Action: "DELETE",
  240. Key: key,
  241. PrevValue: node.Value,
  242. Index: index,
  243. }
  244. if node.ExpireTime.Equal(PERMANENT) {
  245. s.Tree.delete(key)
  246. } else {
  247. resp.Expiration = &node.ExpireTime
  248. // Kill the expire go routine
  249. node.update <- PERMANENT
  250. s.Tree.delete(key)
  251. }
  252. msg, err := json.Marshal(resp)
  253. s.watcher.notify(resp)
  254. // notify the messager
  255. if s.messager != nil && err == nil {
  256. *s.messager <- string(msg)
  257. }
  258. s.addToResponseMap(index, &resp)
  259. return msg, err
  260. } else {
  261. err := NotFoundError(key)
  262. return nil, err
  263. }
  264. }
  265. // Set the value of the key to the value if the given prevValue is equal to the value of the key
  266. func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  267. resp := s.internalGet(key)
  268. if resp == nil {
  269. err := NotFoundError(key)
  270. return nil, err
  271. }
  272. if resp.Value == prevValue {
  273. // If test success, do set
  274. return s.Set(key, value, expireTime, index)
  275. } else {
  276. // If fails, return err
  277. err := TestFail(fmt.Sprintf("%s==%s", resp.Value, prevValue))
  278. return nil, err
  279. }
  280. }
  281. // Add a channel to the watchHub.
  282. // The watchHub will send response to the channel when any key under the prefix
  283. // changes [since the sinceIndex if given]
  284. func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
  285. return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, &s.ResponseMap)
  286. }
  287. // This function should be created as a go routine to delete the key-value pair
  288. // when it reaches expiration time
  289. func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
  290. duration := expireTime.Sub(time.Now())
  291. for {
  292. select {
  293. // Timeout delete the node
  294. case <-time.After(duration):
  295. node, ok := s.Tree.get(key)
  296. if !ok {
  297. return
  298. } else {
  299. s.Tree.delete(key)
  300. resp := Response{
  301. Action: "DELETE",
  302. Key: key,
  303. PrevValue: node.Value,
  304. Expiration: &node.ExpireTime,
  305. Index: s.Index,
  306. }
  307. msg, err := json.Marshal(resp)
  308. s.watcher.notify(resp)
  309. // notify the messager
  310. if s.messager != nil && err == nil {
  311. *s.messager <- string(msg)
  312. }
  313. return
  314. }
  315. case updateTime := <-update:
  316. // Update duration
  317. // If the node become a permanent one, the go routine is
  318. // not needed
  319. if updateTime.Equal(PERMANENT) {
  320. return
  321. }
  322. // Update duration
  323. duration = updateTime.Sub(time.Now())
  324. }
  325. }
  326. }
  327. // When we receive a command that will change the state of the key-value store
  328. // We will add the result of it to the ResponseMap for the use of watch command
  329. // Also we may remove the oldest response when we add new one
  330. func (s *Store) addToResponseMap(index uint64, resp *Response) {
  331. // zero case
  332. if s.ResponseMaxSize == 0 {
  333. return
  334. }
  335. strIndex := strconv.FormatUint(index, 10)
  336. s.ResponseMap[strIndex] = *resp
  337. // unlimited
  338. if s.ResponseMaxSize < 0 {
  339. s.ResponseCurrSize++
  340. return
  341. }
  342. // if we reach the max point, we need to delete the most latest
  343. // response and update the startIndex
  344. if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
  345. s.ResponseStartIndex++
  346. delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
  347. } else {
  348. s.ResponseCurrSize++
  349. }
  350. }
  351. // Save the current state of the storage system
  352. func (s *Store) Save() ([]byte, error) {
  353. b, err := json.Marshal(s)
  354. if err != nil {
  355. fmt.Println(err)
  356. return nil, err
  357. }
  358. return b, nil
  359. }
  360. // Recovery the state of the stroage system from a previous state
  361. func (s *Store) Recovery(state []byte) error {
  362. err := json.Unmarshal(state, s)
  363. // The only thing need to change after the recovery is the
  364. // node with expiration time, we need to delete all the node
  365. // that have been expired and setup go routines to monitor the
  366. // other ones
  367. s.checkExpiration()
  368. return err
  369. }
  370. // Clean the expired nodes
  371. // Set up go routines to mon
  372. func (s *Store) checkExpiration() {
  373. s.Tree.traverse(s.checkNode, false)
  374. }
  375. // Check each node
  376. func (s *Store) checkNode(key string, node *Node) {
  377. if node.ExpireTime.Equal(PERMANENT) {
  378. return
  379. } else {
  380. if node.ExpireTime.Sub(time.Now()) >= time.Second {
  381. node.update = make(chan time.Time)
  382. go s.monitorExpiration(key, node.update, node.ExpireTime)
  383. } else {
  384. // we should delete this node
  385. s.Tree.delete(key)
  386. }
  387. }
  388. }