store.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "path"
  6. "strconv"
  7. "sync"
  8. "time"
  9. )
  10. //------------------------------------------------------------------------------
  11. //
  12. // Typedefs
  13. //
  14. //------------------------------------------------------------------------------
  15. // The main struct of the Key-Value store
  16. type Store struct {
  17. // key-value store structure
  18. Tree *tree
  19. // This mutex protects everything except add watcher member.
  20. // Add watch member does not depend on the current state of the store.
  21. // And watch will return when other protected function is called and reach
  22. // the watching condition.
  23. // It is needed so that clone() can atomically replicate the Store
  24. // and do the log snapshot in a go routine.
  25. mutex sync.RWMutex
  26. // WatcherHub is where we register all the clients
  27. // who issue a watch request
  28. watcher *WatcherHub
  29. // The string channel to send messages to the outside world
  30. // Now we use it to send changes to the hub of the web service
  31. messager chan<- string
  32. // A map to keep the recent response to the clients
  33. ResponseMap map[string]*Response
  34. // The max number of the recent responses we can record
  35. ResponseMaxSize int
  36. // The current number of the recent responses we have recorded
  37. ResponseCurrSize uint
  38. // The index of the first recent responses we have
  39. ResponseStartIndex uint64
  40. // Current index of the raft machine
  41. Index uint64
  42. // Basic statistics information of etcd storage
  43. BasicStats EtcdStats
  44. }
  45. // A Node represents a Value in the Key-Value pair in the store
  46. // It has its value, expire time and a channel used to update the
  47. // expire time (since we do countdown in a go routine, we need to
  48. // communicate with it via channel)
  49. type Node struct {
  50. // The string value of the node
  51. Value string `json:"value"`
  52. // If the node is a permanent one the ExprieTime will be Unix(0,0)
  53. // Otherwise after the expireTime, the node will be deleted
  54. ExpireTime time.Time `json:"expireTime"`
  55. // A channel to update the expireTime of the node
  56. update chan time.Time `json:"-"`
  57. }
  58. // The response from the store to the user who issue a command
  59. type Response struct {
  60. Action string `json:"action"`
  61. Key string `json:"key"`
  62. Dir bool `json:"dir,omitempty"`
  63. PrevValue string `json:"prevValue,omitempty"`
  64. Value string `json:"value,omitempty"`
  65. // If the key did not exist before the action,
  66. // this field should be set to true
  67. NewKey bool `json:"newKey,omitempty"`
  68. Expiration *time.Time `json:"expiration,omitempty"`
  69. // Time to live in second
  70. TTL int64 `json:"ttl,omitempty"`
  71. // The command index of the raft machine when the command is executed
  72. Index uint64 `json:"index"`
  73. }
  74. // A listNode represent the simplest Key-Value pair with its type
  75. // It is only used when do list opeartion
  76. // We want to have a file system like store, thus we distingush "file"
  77. // and "directory"
  78. type ListNode struct {
  79. Key string
  80. Value string
  81. Type string
  82. }
  83. var PERMANENT = time.Unix(0, 0)
  84. //------------------------------------------------------------------------------
  85. //
  86. // Methods
  87. //
  88. //------------------------------------------------------------------------------
  89. // Create a new stroe
  90. // Arguement max is the max number of response we want to record
  91. func CreateStore(max int) *Store {
  92. s := new(Store)
  93. s.messager = nil
  94. s.ResponseMap = make(map[string]*Response)
  95. s.ResponseStartIndex = 0
  96. s.ResponseMaxSize = max
  97. s.ResponseCurrSize = 0
  98. s.Tree = &tree{
  99. &treeNode{
  100. Node{
  101. "/",
  102. time.Unix(0, 0),
  103. nil,
  104. },
  105. true,
  106. make(map[string]*treeNode),
  107. },
  108. }
  109. s.watcher = newWatcherHub()
  110. return s
  111. }
  112. // Set the messager of the store
  113. func (s *Store) SetMessager(messager chan<- string) {
  114. s.messager = messager
  115. }
  116. func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  117. s.mutex.Lock()
  118. defer s.mutex.Unlock()
  119. return s.internalSet(key, value, expireTime, index)
  120. }
  121. // Set the key to value with expiration time
  122. func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  123. //Update index
  124. s.Index = index
  125. //Update stats
  126. s.BasicStats.Sets++
  127. key = path.Clean("/" + key)
  128. isExpire := !expireTime.Equal(PERMANENT)
  129. // base response
  130. resp := Response{
  131. Action: "SET",
  132. Key: key,
  133. Value: value,
  134. Index: index,
  135. }
  136. // When the slow follower receive the set command
  137. // the key may be expired, we should not add the node
  138. // also if the node exist, we need to delete the node
  139. if isExpire && expireTime.Sub(time.Now()) < 0 {
  140. return s.internalDelete(key, index)
  141. }
  142. var TTL int64
  143. // Update ttl
  144. if isExpire {
  145. TTL = int64(expireTime.Sub(time.Now()) / time.Second)
  146. resp.Expiration = &expireTime
  147. resp.TTL = TTL
  148. }
  149. // Get the node
  150. node, ok := s.Tree.get(key)
  151. if ok {
  152. // Update when node exists
  153. // Node is not permanent
  154. if !node.ExpireTime.Equal(PERMANENT) {
  155. // If node is not permanent
  156. // Update its expireTime
  157. node.update <- expireTime
  158. } else {
  159. // If we want the permanent node to have expire time
  160. // We need to create a go routine with a channel
  161. if isExpire {
  162. node.update = make(chan time.Time)
  163. go s.monitorExpiration(key, node.update, expireTime)
  164. }
  165. }
  166. // Update the information of the node
  167. s.Tree.set(key, Node{value, expireTime, node.update})
  168. resp.PrevValue = node.Value
  169. s.watcher.notify(resp)
  170. msg, err := json.Marshal(resp)
  171. // Send to the messager
  172. if s.messager != nil && err == nil {
  173. s.messager <- string(msg)
  174. }
  175. s.addToResponseMap(index, &resp)
  176. return msg, err
  177. // Add new node
  178. } else {
  179. update := make(chan time.Time)
  180. ok := s.Tree.set(key, Node{value, expireTime, update})
  181. if !ok {
  182. err := NotFile(key)
  183. return nil, err
  184. }
  185. if isExpire {
  186. go s.monitorExpiration(key, update, expireTime)
  187. }
  188. resp.NewKey = true
  189. msg, err := json.Marshal(resp)
  190. // Nofity the watcher
  191. s.watcher.notify(resp)
  192. // Send to the messager
  193. if s.messager != nil && err == nil {
  194. s.messager <- string(msg)
  195. }
  196. s.addToResponseMap(index, &resp)
  197. return msg, err
  198. }
  199. }
  200. // Get the value of the key and return the raw response
  201. func (s *Store) internalGet(key string) *Response {
  202. key = path.Clean("/" + key)
  203. node, ok := s.Tree.get(key)
  204. if ok {
  205. var TTL int64
  206. var isExpire bool = false
  207. isExpire = !node.ExpireTime.Equal(PERMANENT)
  208. resp := &Response{
  209. Action: "GET",
  210. Key: key,
  211. Value: node.Value,
  212. Index: s.Index,
  213. }
  214. // Update ttl
  215. if isExpire {
  216. TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  217. resp.Expiration = &node.ExpireTime
  218. resp.TTL = TTL
  219. }
  220. return resp
  221. } else {
  222. // we do not found the key
  223. return nil
  224. }
  225. }
  226. // Get all the items under key
  227. // If key is a file return the file
  228. // If key is a directory reuturn an array of files
  229. func (s *Store) Get(key string) ([]byte, error) {
  230. s.mutex.RLock()
  231. defer s.mutex.RUnlock()
  232. resps, err := s.RawGet(key)
  233. if err != nil {
  234. return nil, err
  235. }
  236. key = path.Clean("/" + key)
  237. // If the number of resps == 1 and the response key
  238. // is the key we query, a signal key-value should
  239. // be returned
  240. if len(resps) == 1 && resps[0].Key == key {
  241. return json.Marshal(resps[0])
  242. }
  243. return json.Marshal(resps)
  244. }
  245. func (s *Store) RawGet(key string) ([]*Response, error) {
  246. // Update stats
  247. s.BasicStats.Gets++
  248. key = path.Clean("/" + key)
  249. nodes, keys, ok := s.Tree.list(key)
  250. if ok {
  251. node, ok := nodes.(*Node)
  252. if ok {
  253. resps := make([]*Response, 1)
  254. isExpire := !node.ExpireTime.Equal(PERMANENT)
  255. resps[0] = &Response{
  256. Action: "GET",
  257. Index: s.Index,
  258. Key: key,
  259. Value: node.Value,
  260. }
  261. // Update ttl
  262. if isExpire {
  263. TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  264. resps[0].Expiration = &node.ExpireTime
  265. resps[0].TTL = TTL
  266. }
  267. return resps, nil
  268. }
  269. nodes, _ := nodes.([]*Node)
  270. resps := make([]*Response, len(nodes))
  271. for i := 0; i < len(nodes); i++ {
  272. var TTL int64
  273. var isExpire bool = false
  274. isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
  275. resps[i] = &Response{
  276. Action: "GET",
  277. Index: s.Index,
  278. Key: path.Join(key, keys[i]),
  279. }
  280. if len(nodes[i].Value) != 0 {
  281. resps[i].Value = nodes[i].Value
  282. } else {
  283. resps[i].Dir = true
  284. }
  285. // Update ttl
  286. if isExpire {
  287. TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second)
  288. resps[i].Expiration = &nodes[i].ExpireTime
  289. resps[i].TTL = TTL
  290. }
  291. }
  292. return resps, nil
  293. }
  294. err := NotFoundError(key)
  295. return nil, err
  296. }
  297. func (s *Store) Delete(key string, index uint64) ([]byte, error) {
  298. s.mutex.Lock()
  299. defer s.mutex.Unlock()
  300. return s.internalDelete(key, index)
  301. }
  302. // Delete the key
  303. func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
  304. // Update stats
  305. s.BasicStats.Deletes++
  306. key = path.Clean("/" + key)
  307. // Update index
  308. s.Index = index
  309. node, ok := s.Tree.get(key)
  310. if ok {
  311. resp := Response{
  312. Action: "DELETE",
  313. Key: key,
  314. PrevValue: node.Value,
  315. Index: index,
  316. }
  317. if node.ExpireTime.Equal(PERMANENT) {
  318. s.Tree.delete(key)
  319. } else {
  320. resp.Expiration = &node.ExpireTime
  321. // Kill the expire go routine
  322. node.update <- PERMANENT
  323. s.Tree.delete(key)
  324. }
  325. msg, err := json.Marshal(resp)
  326. s.watcher.notify(resp)
  327. // notify the messager
  328. if s.messager != nil && err == nil {
  329. s.messager <- string(msg)
  330. }
  331. s.addToResponseMap(index, &resp)
  332. return msg, err
  333. } else {
  334. err := NotFoundError(key)
  335. return nil, err
  336. }
  337. }
  338. // Set the value of the key to the value if the given prevValue is equal to the value of the key
  339. func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  340. s.mutex.Lock()
  341. defer s.mutex.Unlock()
  342. // Update stats
  343. s.BasicStats.TestAndSets++
  344. resp := s.internalGet(key)
  345. if resp == nil {
  346. err := NotFoundError(key)
  347. return nil, err
  348. }
  349. if resp.Value == prevValue {
  350. // If test success, do set
  351. return s.internalSet(key, value, expireTime, index)
  352. } else {
  353. // If fails, return err
  354. err := TestFail(fmt.Sprintf("TestAndSet: %s!=%s", resp.Value, prevValue))
  355. return nil, err
  356. }
  357. }
  358. // Add a channel to the watchHub.
  359. // The watchHub will send response to the channel when any key under the prefix
  360. // changes [since the sinceIndex if given]
  361. func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
  362. return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap)
  363. }
  364. // This function should be created as a go routine to delete the key-value pair
  365. // when it reaches expiration time
  366. func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
  367. duration := expireTime.Sub(time.Now())
  368. for {
  369. select {
  370. // Timeout delete the node
  371. case <-time.After(duration):
  372. node, ok := s.Tree.get(key)
  373. if !ok {
  374. return
  375. } else {
  376. s.mutex.Lock()
  377. s.Tree.delete(key)
  378. resp := Response{
  379. Action: "DELETE",
  380. Key: key,
  381. PrevValue: node.Value,
  382. Expiration: &node.ExpireTime,
  383. Index: s.Index,
  384. }
  385. s.mutex.Unlock()
  386. msg, err := json.Marshal(resp)
  387. s.watcher.notify(resp)
  388. // notify the messager
  389. if s.messager != nil && err == nil {
  390. s.messager <- string(msg)
  391. }
  392. return
  393. }
  394. case updateTime := <-update:
  395. // Update duration
  396. // If the node become a permanent one, the go routine is
  397. // not needed
  398. if updateTime.Equal(PERMANENT) {
  399. return
  400. }
  401. // Update duration
  402. duration = updateTime.Sub(time.Now())
  403. }
  404. }
  405. }
  406. // When we receive a command that will change the state of the key-value store
  407. // We will add the result of it to the ResponseMap for the use of watch command
  408. // Also we may remove the oldest response when we add new one
  409. func (s *Store) addToResponseMap(index uint64, resp *Response) {
  410. // zero case
  411. if s.ResponseMaxSize == 0 {
  412. return
  413. }
  414. strIndex := strconv.FormatUint(index, 10)
  415. s.ResponseMap[strIndex] = resp
  416. // unlimited
  417. if s.ResponseMaxSize < 0 {
  418. s.ResponseCurrSize++
  419. return
  420. }
  421. // if we reach the max point, we need to delete the most latest
  422. // response and update the startIndex
  423. if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
  424. s.ResponseStartIndex++
  425. delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
  426. } else {
  427. s.ResponseCurrSize++
  428. }
  429. }
  430. func (s *Store) clone() *Store {
  431. newStore := &Store{
  432. ResponseMaxSize: s.ResponseMaxSize,
  433. ResponseCurrSize: s.ResponseCurrSize,
  434. ResponseStartIndex: s.ResponseStartIndex,
  435. Index: s.Index,
  436. BasicStats: s.BasicStats,
  437. }
  438. newStore.Tree = s.Tree.clone()
  439. newStore.ResponseMap = make(map[string]*Response)
  440. for index, response := range s.ResponseMap {
  441. newStore.ResponseMap[index] = response
  442. }
  443. return newStore
  444. }
  445. // Save the current state of the storage system
  446. func (s *Store) Save() ([]byte, error) {
  447. // first we clone the store
  448. // json is very slow, we cannot hold the lock for such a long time
  449. s.mutex.Lock()
  450. cloneStore := s.clone()
  451. s.mutex.Unlock()
  452. b, err := json.Marshal(cloneStore)
  453. if err != nil {
  454. fmt.Println(err)
  455. return nil, err
  456. }
  457. return b, nil
  458. }
  459. // Recovery the state of the stroage system from a previous state
  460. func (s *Store) Recovery(state []byte) error {
  461. s.mutex.Lock()
  462. defer s.mutex.Unlock()
  463. // we need to stop all the current watchers
  464. // recovery will clear watcherHub
  465. s.watcher.stopWatchers()
  466. err := json.Unmarshal(state, s)
  467. // The only thing need to change after the recovery is the
  468. // node with expiration time, we need to delete all the node
  469. // that have been expired and setup go routines to monitor the
  470. // other ones
  471. s.checkExpiration()
  472. return err
  473. }
  474. // Clean the expired nodes
  475. // Set up go routines to mon
  476. func (s *Store) checkExpiration() {
  477. s.Tree.traverse(s.checkNode, false)
  478. }
  479. // Check each node
  480. func (s *Store) checkNode(key string, node *Node) {
  481. if node.ExpireTime.Equal(PERMANENT) {
  482. return
  483. } else {
  484. if node.ExpireTime.Sub(time.Now()) >= time.Second {
  485. node.update = make(chan time.Time)
  486. go s.monitorExpiration(key, node.update, node.ExpireTime)
  487. } else {
  488. // we should delete this node
  489. s.Tree.delete(key)
  490. }
  491. }
  492. }