store.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "path"
  6. "strconv"
  7. "time"
  8. )
  9. //------------------------------------------------------------------------------
  10. //
  11. // Typedefs
  12. //
  13. //------------------------------------------------------------------------------
  14. // The main struct of the Key-Value store
  15. type Store struct {
  16. // key-value store structure
  17. Tree *tree
  18. // WatcherHub is where we register all the clients
  19. // who issue a watch request
  20. watcher *WatcherHub
  21. // The string channel to send messages to the outside world
  22. // Now we use it to send changes to the hub of the web service
  23. messager *chan string
  24. // A map to keep the recent response to the clients
  25. ResponseMap map[string]Response
  26. // The max number of the recent responses we can record
  27. ResponseMaxSize int
  28. // The current number of the recent responses we have recorded
  29. ResponseCurrSize uint
  30. // The index of the first recent responses we have
  31. ResponseStartIndex uint64
  32. // Current index of the raft machine
  33. Index uint64
  34. }
  35. // A Node represents a Value in the Key-Value pair in the store
  36. // It has its value, expire time and a channel used to update the
  37. // expire time (since we do countdown in a go routine, we need to
  38. // communicate with it via channel)
  39. type Node struct {
  40. // The string value of the node
  41. Value string `json:"value"`
  42. // If the node is a permanent one the ExprieTime will be Unix(0,0)
  43. // Otherwise after the expireTime, the node will be deleted
  44. ExpireTime time.Time `json:"expireTime"`
  45. // A channel to update the expireTime of the node
  46. update chan time.Time `json:"-"`
  47. }
  48. // The response from the store to the user who issue a command
  49. type Response struct {
  50. Action string `json:"action"`
  51. Key string `json:"key"`
  52. Dir bool `json:"dir,omitempty"`
  53. PrevValue string `json:"prevValue,omitempty"`
  54. Value string `json:"value,omitempty"`
  55. // If the key did not exist before the action,
  56. // this field should be set to true
  57. NewKey bool `json:"newKey,omitempty"`
  58. Expiration *time.Time `json:"expiration,omitempty"`
  59. // Time to live in second
  60. TTL int64 `json:"ttl,omitempty"`
  61. // The command index of the raft machine when the command is executed
  62. Index uint64 `json:"index"`
  63. }
  64. // A listNode represent the simplest Key-Value pair with its type
  65. // It is only used when do list opeartion
  66. // We want to have a file system like store, thus we distingush "file"
  67. // and "directory"
  68. type ListNode struct {
  69. Key string
  70. Value string
  71. Type string
  72. }
  73. var PERMANENT = time.Unix(0, 0)
  74. //------------------------------------------------------------------------------
  75. //
  76. // Methods
  77. //
  78. //------------------------------------------------------------------------------
  79. // Create a new stroe
  80. // Arguement max is the max number of response we want to record
  81. func CreateStore(max int) *Store {
  82. s := new(Store)
  83. s.messager = nil
  84. s.ResponseMap = make(map[string]Response)
  85. s.ResponseStartIndex = 0
  86. s.ResponseMaxSize = max
  87. s.ResponseCurrSize = 0
  88. s.Tree = &tree{
  89. &treeNode{
  90. Node{
  91. "/",
  92. time.Unix(0, 0),
  93. nil,
  94. },
  95. true,
  96. make(map[string]*treeNode),
  97. },
  98. }
  99. s.watcher = createWatcherHub()
  100. return s
  101. }
  102. // Set the messager of the store
  103. func (s *Store) SetMessager(messager *chan string) {
  104. s.messager = messager
  105. }
  106. // Set the key to value with expiration time
  107. func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  108. //Update index
  109. s.Index = index
  110. key = path.Clean("/" + key)
  111. isExpire := !expireTime.Equal(PERMANENT)
  112. // base response
  113. resp := Response{
  114. Action: "SET",
  115. Key: key,
  116. Value: value,
  117. Index: index,
  118. }
  119. // When the slow follower receive the set command
  120. // the key may be expired, we should not add the node
  121. // also if the node exist, we need to delete the node
  122. if isExpire && expireTime.Sub(time.Now()) < 0 {
  123. return s.Delete(key, index)
  124. }
  125. var TTL int64
  126. // Update ttl
  127. if isExpire {
  128. TTL = int64(expireTime.Sub(time.Now()) / time.Second)
  129. resp.Expiration = &expireTime
  130. resp.TTL = TTL
  131. }
  132. // Get the node
  133. node, ok := s.Tree.get(key)
  134. if ok {
  135. // Update when node exists
  136. // Node is not permanent
  137. if !node.ExpireTime.Equal(PERMANENT) {
  138. // If node is not permanent
  139. // Update its expireTime
  140. node.update <- expireTime
  141. } else {
  142. // If we want the permanent node to have expire time
  143. // We need to create create a go routine with a channel
  144. if isExpire {
  145. node.update = make(chan time.Time)
  146. go s.monitorExpiration(key, node.update, expireTime)
  147. }
  148. }
  149. // Update the information of the node
  150. s.Tree.set(key, Node{value, expireTime, node.update})
  151. resp.PrevValue = node.Value
  152. s.watcher.notify(resp)
  153. msg, err := json.Marshal(resp)
  154. // Send to the messager
  155. if s.messager != nil && err == nil {
  156. *s.messager <- string(msg)
  157. }
  158. s.addToResponseMap(index, &resp)
  159. return msg, err
  160. // Add new node
  161. } else {
  162. update := make(chan time.Time)
  163. ok := s.Tree.set(key, Node{value, expireTime, update})
  164. if !ok {
  165. err := NotFile(key)
  166. return nil, err
  167. }
  168. if isExpire {
  169. go s.monitorExpiration(key, update, expireTime)
  170. }
  171. resp.NewKey = true
  172. msg, err := json.Marshal(resp)
  173. // Nofity the watcher
  174. s.watcher.notify(resp)
  175. // Send to the messager
  176. if s.messager != nil && err == nil {
  177. *s.messager <- string(msg)
  178. }
  179. s.addToResponseMap(index, &resp)
  180. return msg, err
  181. }
  182. }
  183. // Get the value of the key and return the raw response
  184. func (s *Store) internalGet(key string) *Response {
  185. key = path.Clean("/" + key)
  186. node, ok := s.Tree.get(key)
  187. if ok {
  188. var TTL int64
  189. var isExpire bool = false
  190. isExpire = !node.ExpireTime.Equal(PERMANENT)
  191. resp := &Response{
  192. Action: "GET",
  193. Key: key,
  194. Value: node.Value,
  195. Index: s.Index,
  196. }
  197. // Update ttl
  198. if isExpire {
  199. TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  200. resp.Expiration = &node.ExpireTime
  201. resp.TTL = TTL
  202. }
  203. return resp
  204. } else {
  205. // we do not found the key
  206. return nil
  207. }
  208. }
  209. // Get all the items under key
  210. // If key is a file return the file
  211. // If key is a directory reuturn an array of files
  212. func (s *Store) Get(key string) ([]byte, error) {
  213. key = path.Clean("/" + key)
  214. nodes, keys, dirs, ok := s.Tree.list(key)
  215. if ok {
  216. resps := make([]Response, len(nodes))
  217. for i := 0; i < len(nodes); i++ {
  218. var TTL int64
  219. var isExpire bool = false
  220. isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
  221. resps[i] = Response{
  222. Action: "GET",
  223. Index: s.Index,
  224. Key: path.Join(key, keys[i]),
  225. }
  226. if !dirs[i] {
  227. resps[i].Value = nodes[i].Value
  228. } else {
  229. resps[i].Dir = true
  230. }
  231. // Update ttl
  232. if isExpire {
  233. TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second)
  234. resps[i].Expiration = &nodes[i].ExpireTime
  235. resps[i].TTL = TTL
  236. }
  237. }
  238. if len(resps) == 1 {
  239. return json.Marshal(resps[0])
  240. }
  241. return json.Marshal(resps)
  242. }
  243. err := NotFoundError(key)
  244. return nil, err
  245. }
  246. // Delete the key
  247. func (s *Store) Delete(key string, index uint64) ([]byte, error) {
  248. key = path.Clean("/" + key)
  249. //Update index
  250. s.Index = index
  251. node, ok := s.Tree.get(key)
  252. if ok {
  253. resp := Response{
  254. Action: "DELETE",
  255. Key: key,
  256. PrevValue: node.Value,
  257. Index: index,
  258. }
  259. if node.ExpireTime.Equal(PERMANENT) {
  260. s.Tree.delete(key)
  261. } else {
  262. resp.Expiration = &node.ExpireTime
  263. // Kill the expire go routine
  264. node.update <- PERMANENT
  265. s.Tree.delete(key)
  266. }
  267. msg, err := json.Marshal(resp)
  268. s.watcher.notify(resp)
  269. // notify the messager
  270. if s.messager != nil && err == nil {
  271. *s.messager <- string(msg)
  272. }
  273. s.addToResponseMap(index, &resp)
  274. return msg, err
  275. } else {
  276. err := NotFoundError(key)
  277. return nil, err
  278. }
  279. }
  280. // Set the value of the key to the value if the given prevValue is equal to the value of the key
  281. func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  282. resp := s.internalGet(key)
  283. if resp == nil {
  284. err := NotFoundError(key)
  285. return nil, err
  286. }
  287. if resp.Value == prevValue {
  288. // If test success, do set
  289. return s.Set(key, value, expireTime, index)
  290. } else {
  291. // If fails, return err
  292. err := TestFail(fmt.Sprintf("TestAndSet: %s!=%s", resp.Value, prevValue))
  293. return nil, err
  294. }
  295. }
  296. // Add a channel to the watchHub.
  297. // The watchHub will send response to the channel when any key under the prefix
  298. // changes [since the sinceIndex if given]
  299. func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
  300. return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, &s.ResponseMap)
  301. }
  302. // This function should be created as a go routine to delete the key-value pair
  303. // when it reaches expiration time
  304. func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
  305. duration := expireTime.Sub(time.Now())
  306. for {
  307. select {
  308. // Timeout delete the node
  309. case <-time.After(duration):
  310. node, ok := s.Tree.get(key)
  311. if !ok {
  312. return
  313. } else {
  314. s.Tree.delete(key)
  315. resp := Response{
  316. Action: "DELETE",
  317. Key: key,
  318. PrevValue: node.Value,
  319. Expiration: &node.ExpireTime,
  320. Index: s.Index,
  321. }
  322. msg, err := json.Marshal(resp)
  323. s.watcher.notify(resp)
  324. // notify the messager
  325. if s.messager != nil && err == nil {
  326. *s.messager <- string(msg)
  327. }
  328. return
  329. }
  330. case updateTime := <-update:
  331. // Update duration
  332. // If the node become a permanent one, the go routine is
  333. // not needed
  334. if updateTime.Equal(PERMANENT) {
  335. return
  336. }
  337. // Update duration
  338. duration = updateTime.Sub(time.Now())
  339. }
  340. }
  341. }
  342. // When we receive a command that will change the state of the key-value store
  343. // We will add the result of it to the ResponseMap for the use of watch command
  344. // Also we may remove the oldest response when we add new one
  345. func (s *Store) addToResponseMap(index uint64, resp *Response) {
  346. // zero case
  347. if s.ResponseMaxSize == 0 {
  348. return
  349. }
  350. strIndex := strconv.FormatUint(index, 10)
  351. s.ResponseMap[strIndex] = *resp
  352. // unlimited
  353. if s.ResponseMaxSize < 0 {
  354. s.ResponseCurrSize++
  355. return
  356. }
  357. // if we reach the max point, we need to delete the most latest
  358. // response and update the startIndex
  359. if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
  360. s.ResponseStartIndex++
  361. delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
  362. } else {
  363. s.ResponseCurrSize++
  364. }
  365. }
  366. // Save the current state of the storage system
  367. func (s *Store) Save() ([]byte, error) {
  368. b, err := json.Marshal(s)
  369. if err != nil {
  370. fmt.Println(err)
  371. return nil, err
  372. }
  373. return b, nil
  374. }
  375. // Recovery the state of the stroage system from a previous state
  376. func (s *Store) Recovery(state []byte) error {
  377. err := json.Unmarshal(state, s)
  378. // The only thing need to change after the recovery is the
  379. // node with expiration time, we need to delete all the node
  380. // that have been expired and setup go routines to monitor the
  381. // other ones
  382. s.checkExpiration()
  383. return err
  384. }
  385. // Clean the expired nodes
  386. // Set up go routines to mon
  387. func (s *Store) checkExpiration() {
  388. s.Tree.traverse(s.checkNode, false)
  389. }
  390. // Check each node
  391. func (s *Store) checkNode(key string, node *Node) {
  392. if node.ExpireTime.Equal(PERMANENT) {
  393. return
  394. } else {
  395. if node.ExpireTime.Sub(time.Now()) >= time.Second {
  396. node.update = make(chan time.Time)
  397. go s.monitorExpiration(key, node.update, node.ExpireTime)
  398. } else {
  399. // we should delete this node
  400. s.Tree.delete(key)
  401. }
  402. }
  403. }