store.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "path"
  6. "time"
  7. )
  8. // global store
  9. var s *Store
  10. // CONSTANTS
  11. const (
  12. ERROR = -1 + iota
  13. SET
  14. DELETE
  15. GET
  16. )
  17. var PERMANENT = time.Unix(0, 0)
  18. type Store struct {
  19. // use the build-in hash map as the key-value store structure
  20. Nodes map[string]Node `json:"nodes"`
  21. // the string channel to send messages to the outside world
  22. // now we use it to send changes to the hub of the web service
  23. messager *chan string
  24. }
  25. type Node struct {
  26. Value string `json:"value"`
  27. // if the node is a permanent one the ExprieTime will be Unix(0,0)
  28. // Otherwise after the expireTime, the node will be deleted
  29. ExpireTime time.Time `json:"expireTime"`
  30. // a channel to update the expireTime of the node
  31. update chan time.Time `json:"-"`
  32. }
  33. type Response struct {
  34. Action int `json:"action"`
  35. Key string `json:"key"`
  36. OldValue string `json:"oldValue"`
  37. NewValue string `json:"newValue"`
  38. // if the key existed before the action, this field should be true
  39. // if the key did not exist before the action, this field should be false
  40. Exist bool `json:"exist"`
  41. Expiration time.Time `json:"expiration"`
  42. }
  43. func init() {
  44. s = createStore()
  45. s.messager = nil
  46. }
  47. // make a new stroe
  48. func createStore() *Store {
  49. s := new(Store)
  50. s.Nodes = make(map[string]Node)
  51. return s
  52. }
  53. // return a pointer to the store
  54. func GetStore() *Store {
  55. return s
  56. }
  57. // set the messager of the store
  58. func (s *Store) SetMessager(messager *chan string) {
  59. s.messager = messager
  60. }
  61. // set the key to value, return the old value if the key exists
  62. func Set(key string, value string, expireTime time.Time) ([]byte, error) {
  63. key = path.Clean(key)
  64. var isExpire bool = false
  65. isExpire = !expireTime.Equal(PERMANENT)
  66. // when the slow follower receive the set command
  67. // the key may be expired, we should not add the node
  68. // also if the node exist, we need to delete the node
  69. if isExpire && expireTime.Sub(time.Now()) < 0 {
  70. return Delete(key)
  71. }
  72. // get the node
  73. node, ok := s.Nodes[key]
  74. if ok {
  75. // if node is not permanent before
  76. // update its expireTime
  77. if !node.ExpireTime.Equal(PERMANENT) {
  78. node.update <- expireTime
  79. } else {
  80. // if we want the permanent node to have expire time
  81. // we need to create a chan and create a go routine
  82. if isExpire {
  83. node.update = make(chan time.Time)
  84. go expire(key, node.update, expireTime)
  85. }
  86. }
  87. // update the information of the node
  88. s.Nodes[key] = Node{value, expireTime, node.update}
  89. resp := Response{SET, key, node.Value, value, true, expireTime}
  90. msg, err := json.Marshal(resp)
  91. notify(resp)
  92. // send to the messager
  93. if s.messager != nil && err == nil {
  94. *s.messager <- string(msg)
  95. }
  96. return msg, err
  97. // add new node
  98. } else {
  99. update := make(chan time.Time)
  100. s.Nodes[key] = Node{value, expireTime, update}
  101. if isExpire {
  102. go expire(key, update, expireTime)
  103. }
  104. resp := Response{SET, key, "", value, false, expireTime}
  105. msg, err := json.Marshal(resp)
  106. // nofity the watcher
  107. notify(resp)
  108. // notify the web interface
  109. if s.messager != nil && err == nil {
  110. *s.messager <- string(msg)
  111. }
  112. return msg, err
  113. }
  114. }
  115. // should be used as a go routine to delete the key when it expires
  116. func expire(key string, update chan time.Time, expireTime time.Time) {
  117. duration := expireTime.Sub(time.Now())
  118. for {
  119. select {
  120. // timeout delete the node
  121. case <-time.After(duration):
  122. node, ok := s.Nodes[key]
  123. if !ok {
  124. return
  125. } else {
  126. delete(s.Nodes, key)
  127. resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime}
  128. msg, err := json.Marshal(resp)
  129. notify(resp)
  130. // notify the messager
  131. if s.messager != nil && err == nil {
  132. *s.messager <- string(msg)
  133. }
  134. return
  135. }
  136. case updateTime := <-update:
  137. //update duration
  138. // if the node become a permanent one, the go routine is
  139. // not needed
  140. if updateTime.Equal(PERMANENT) {
  141. fmt.Println("permanent")
  142. return
  143. }
  144. // update duration
  145. duration = updateTime.Sub(time.Now())
  146. }
  147. }
  148. }
  149. // get the value of the key
  150. func Get(key string) Response {
  151. key = path.Clean(key)
  152. node, ok := s.Nodes[key]
  153. if ok {
  154. return Response{GET, key, node.Value, node.Value, true, node.ExpireTime}
  155. } else {
  156. return Response{GET, key, "", "", false, time.Unix(0, 0)}
  157. }
  158. }
  159. // delete the key
  160. func Delete(key string) ([]byte, error) {
  161. key = path.Clean(key)
  162. node, ok := s.Nodes[key]
  163. if ok {
  164. if node.ExpireTime.Equal(PERMANENT) {
  165. delete(s.Nodes, key)
  166. } else {
  167. // kill the expire go routine
  168. node.update <- PERMANENT
  169. delete(s.Nodes, key)
  170. }
  171. resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime}
  172. msg, err := json.Marshal(resp)
  173. notify(resp)
  174. // notify the messager
  175. if s.messager != nil && err == nil {
  176. *s.messager <- string(msg)
  177. }
  178. return msg, err
  179. } else {
  180. return json.Marshal(Response{DELETE, key, "", "", false, time.Unix(0, 0)})
  181. }
  182. }
  183. // save the current state of the storage system
  184. func (s *Store) Save() ([]byte, error) {
  185. b, err := json.Marshal(s)
  186. if err != nil {
  187. fmt.Println(err)
  188. return nil, err
  189. }
  190. return b, nil
  191. }
  192. // recovery the state of the stroage system from a previous state
  193. func (s *Store) Recovery(state []byte) error {
  194. err := json.Unmarshal(state, s)
  195. // clean the expired nodes
  196. clean()
  197. return err
  198. }
  199. // clean all expired keys
  200. func clean() {
  201. for key, node := range s.Nodes {
  202. if node.ExpireTime.Equal(PERMANENT) {
  203. continue
  204. } else {
  205. if node.ExpireTime.Sub(time.Now()) >= time.Second {
  206. node.update = make(chan time.Time)
  207. go expire(key, node.update, node.ExpireTime)
  208. } else {
  209. // we should delete this node
  210. delete(s.Nodes, key)
  211. }
  212. }
  213. }
  214. }