store.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. etcdErr "github.com/coreos/etcd/error"
  6. "path"
  7. "strconv"
  8. "sync"
  9. "time"
  10. )
  11. //------------------------------------------------------------------------------
  12. //
  13. // Typedefs
  14. //
  15. //------------------------------------------------------------------------------
  16. // The main struct of the Key-Value store
  17. type Store struct {
  18. // key-value store structure
  19. Tree *tree
  20. // This mutex protects everything except add watcher member.
  21. // Add watch member does not depend on the current state of the store.
  22. // And watch will return when other protected function is called and reach
  23. // the watching condition.
  24. // It is needed so that clone() can atomically replicate the Store
  25. // and do the log snapshot in a go routine.
  26. mutex sync.RWMutex
  27. // WatcherHub is where we register all the clients
  28. // who issue a watch request
  29. watcher *WatcherHub
  30. // The string channel to send messages to the outside world
  31. // Now we use it to send changes to the hub of the web service
  32. messager chan<- string
  33. // A map to keep the recent response to the clients
  34. ResponseMap map[string]*Response
  35. // The max number of the recent responses we can record
  36. ResponseMaxSize int
  37. // The current number of the recent responses we have recorded
  38. ResponseCurrSize uint
  39. // The index of the first recent responses we have
  40. ResponseStartIndex uint64
  41. // Current index of the raft machine
  42. Index uint64
  43. // Basic statistics information of etcd storage
  44. BasicStats EtcdStats
  45. }
  46. // A Node represents a Value in the Key-Value pair in the store
  47. // It has its value, expire time and a channel used to update the
  48. // expire time (since we do countdown in a go routine, we need to
  49. // communicate with it via channel)
  50. type Node struct {
  51. // The string value of the node
  52. Value string `json:"value"`
  53. // If the node is a permanent one the ExprieTime will be Unix(0,0)
  54. // Otherwise after the expireTime, the node will be deleted
  55. ExpireTime time.Time `json:"expireTime"`
  56. // A channel to update the expireTime of the node
  57. update chan time.Time `json:"-"`
  58. }
  59. // The response from the store to the user who issue a command
  60. type Response struct {
  61. Action string `json:"action"`
  62. Key string `json:"key"`
  63. Dir bool `json:"dir,omitempty"`
  64. PrevValue string `json:"prevValue,omitempty"`
  65. Value string `json:"value,omitempty"`
  66. // If the key did not exist before the action,
  67. // this field should be set to true
  68. NewKey bool `json:"newKey,omitempty"`
  69. Expiration *time.Time `json:"expiration,omitempty"`
  70. // Time to live in second
  71. TTL int64 `json:"ttl,omitempty"`
  72. // The command index of the raft machine when the command is executed
  73. Index uint64 `json:"index"`
  74. }
  75. // A listNode represent the simplest Key-Value pair with its type
  76. // It is only used when do list opeartion
  77. // We want to have a file system like store, thus we distingush "file"
  78. // and "directory"
  79. type ListNode struct {
  80. Key string
  81. Value string
  82. Type string
  83. }
  84. var PERMANENT = time.Unix(0, 0)
  85. //------------------------------------------------------------------------------
  86. //
  87. // Methods
  88. //
  89. //------------------------------------------------------------------------------
  90. // Create a new stroe
  91. // Arguement max is the max number of response we want to record
  92. func CreateStore(max int) *Store {
  93. s := new(Store)
  94. s.messager = nil
  95. s.ResponseMap = make(map[string]*Response)
  96. s.ResponseStartIndex = 0
  97. s.ResponseMaxSize = max
  98. s.ResponseCurrSize = 0
  99. s.Tree = &tree{
  100. &treeNode{
  101. Node{
  102. "/",
  103. time.Unix(0, 0),
  104. nil,
  105. },
  106. true,
  107. make(map[string]*treeNode),
  108. },
  109. }
  110. s.watcher = newWatcherHub()
  111. return s
  112. }
  113. // Set the messager of the store
  114. func (s *Store) SetMessager(messager chan<- string) {
  115. s.messager = messager
  116. }
  117. func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  118. s.mutex.Lock()
  119. defer s.mutex.Unlock()
  120. return s.internalSet(key, value, expireTime, index)
  121. }
  122. // Set the key to value with expiration time
  123. func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  124. //Update index
  125. s.Index = index
  126. //Update stats
  127. s.BasicStats.Sets++
  128. key = path.Clean("/" + key)
  129. isExpire := !expireTime.Equal(PERMANENT)
  130. // base response
  131. resp := Response{
  132. Action: "SET",
  133. Key: key,
  134. Value: value,
  135. Index: index,
  136. }
  137. // When the slow follower receive the set command
  138. // the key may be expired, we should not add the node
  139. // also if the node exist, we need to delete the node
  140. if isExpire && expireTime.Sub(time.Now()) < 0 {
  141. return s.internalDelete(key, index)
  142. }
  143. var TTL int64
  144. // Update ttl
  145. if isExpire {
  146. TTL = int64(expireTime.Sub(time.Now()) / time.Second)
  147. resp.Expiration = &expireTime
  148. resp.TTL = TTL
  149. }
  150. // Get the node
  151. node, ok := s.Tree.get(key)
  152. if ok {
  153. // Update when node exists
  154. // Node is not permanent
  155. if !node.ExpireTime.Equal(PERMANENT) {
  156. // If node is not permanent
  157. // Update its expireTime
  158. node.update <- expireTime
  159. } else {
  160. // If we want the permanent node to have expire time
  161. // We need to create a go routine with a channel
  162. if isExpire {
  163. node.update = make(chan time.Time)
  164. go s.monitorExpiration(key, node.update, expireTime)
  165. }
  166. }
  167. // Update the information of the node
  168. s.Tree.set(key, Node{value, expireTime, node.update})
  169. resp.PrevValue = node.Value
  170. s.watcher.notify(resp)
  171. msg, err := json.Marshal(resp)
  172. // Send to the messager
  173. if s.messager != nil && err == nil {
  174. s.messager <- string(msg)
  175. }
  176. s.addToResponseMap(index, &resp)
  177. return msg, err
  178. // Add new node
  179. } else {
  180. update := make(chan time.Time)
  181. ok := s.Tree.set(key, Node{value, expireTime, update})
  182. if !ok {
  183. return nil, etcdErr.NewError(102, "set: "+key)
  184. }
  185. if isExpire {
  186. go s.monitorExpiration(key, update, expireTime)
  187. }
  188. resp.NewKey = true
  189. msg, err := json.Marshal(resp)
  190. // Nofity the watcher
  191. s.watcher.notify(resp)
  192. // Send to the messager
  193. if s.messager != nil && err == nil {
  194. s.messager <- string(msg)
  195. }
  196. s.addToResponseMap(index, &resp)
  197. return msg, err
  198. }
  199. }
  200. // Get the value of the key and return the raw response
  201. func (s *Store) internalGet(key string) *Response {
  202. key = path.Clean("/" + key)
  203. node, ok := s.Tree.get(key)
  204. if ok {
  205. var TTL int64
  206. var isExpire bool = false
  207. isExpire = !node.ExpireTime.Equal(PERMANENT)
  208. resp := &Response{
  209. Action: "GET",
  210. Key: key,
  211. Value: node.Value,
  212. Index: s.Index,
  213. }
  214. // Update ttl
  215. if isExpire {
  216. TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  217. resp.Expiration = &node.ExpireTime
  218. resp.TTL = TTL
  219. }
  220. return resp
  221. } else {
  222. // we do not found the key
  223. return nil
  224. }
  225. }
  226. // Get all the items under key
  227. // If key is a file return the file
  228. // If key is a directory reuturn an array of files
  229. func (s *Store) Get(key string) ([]byte, error) {
  230. s.mutex.RLock()
  231. defer s.mutex.RUnlock()
  232. resps, err := s.RawGet(key)
  233. if err != nil {
  234. return nil, err
  235. }
  236. key = path.Clean("/" + key)
  237. // If the number of resps == 1 and the response key
  238. // is the key we query, a signal key-value should
  239. // be returned
  240. if len(resps) == 1 && resps[0].Key == key {
  241. return json.Marshal(resps[0])
  242. }
  243. return json.Marshal(resps)
  244. }
  245. func (s *Store) RawGet(key string) ([]*Response, error) {
  246. // Update stats
  247. s.BasicStats.Gets++
  248. key = path.Clean("/" + key)
  249. nodes, keys, ok := s.Tree.list(key)
  250. if ok {
  251. node, ok := nodes.(*Node)
  252. if ok {
  253. resps := make([]*Response, 1)
  254. isExpire := !node.ExpireTime.Equal(PERMANENT)
  255. resps[0] = &Response{
  256. Action: "GET",
  257. Index: s.Index,
  258. Key: key,
  259. Value: node.Value,
  260. }
  261. // Update ttl
  262. if isExpire {
  263. TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  264. resps[0].Expiration = &node.ExpireTime
  265. resps[0].TTL = TTL
  266. }
  267. return resps, nil
  268. }
  269. nodes, _ := nodes.([]*Node)
  270. resps := make([]*Response, len(nodes))
  271. for i := 0; i < len(nodes); i++ {
  272. var TTL int64
  273. var isExpire bool = false
  274. isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
  275. resps[i] = &Response{
  276. Action: "GET",
  277. Index: s.Index,
  278. Key: path.Join(key, keys[i]),
  279. }
  280. if len(nodes[i].Value) != 0 {
  281. resps[i].Value = nodes[i].Value
  282. } else {
  283. resps[i].Dir = true
  284. }
  285. // Update ttl
  286. if isExpire {
  287. TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second)
  288. resps[i].Expiration = &nodes[i].ExpireTime
  289. resps[i].TTL = TTL
  290. }
  291. }
  292. return resps, nil
  293. }
  294. return nil, etcdErr.NewError(100, "get: "+key)
  295. }
  296. func (s *Store) Delete(key string, index uint64) ([]byte, error) {
  297. s.mutex.Lock()
  298. defer s.mutex.Unlock()
  299. return s.internalDelete(key, index)
  300. }
  301. // Delete the key
  302. func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
  303. // Update stats
  304. s.BasicStats.Deletes++
  305. key = path.Clean("/" + key)
  306. // Update index
  307. s.Index = index
  308. node, ok := s.Tree.get(key)
  309. if ok {
  310. resp := Response{
  311. Action: "DELETE",
  312. Key: key,
  313. PrevValue: node.Value,
  314. Index: index,
  315. }
  316. if node.ExpireTime.Equal(PERMANENT) {
  317. s.Tree.delete(key)
  318. } else {
  319. resp.Expiration = &node.ExpireTime
  320. // Kill the expire go routine
  321. node.update <- PERMANENT
  322. s.Tree.delete(key)
  323. }
  324. msg, err := json.Marshal(resp)
  325. s.watcher.notify(resp)
  326. // notify the messager
  327. if s.messager != nil && err == nil {
  328. s.messager <- string(msg)
  329. }
  330. s.addToResponseMap(index, &resp)
  331. return msg, err
  332. } else {
  333. return nil, etcdErr.NewError(100, "delete: "+key)
  334. }
  335. }
  336. // Set the value of the key to the value if the given prevValue is equal to the value of the key
  337. func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  338. s.mutex.Lock()
  339. defer s.mutex.Unlock()
  340. // Update stats
  341. s.BasicStats.TestAndSets++
  342. resp := s.internalGet(key)
  343. if resp == nil {
  344. return nil, etcdErr.NewError(100, "testandset: "+key)
  345. }
  346. if resp.Value == prevValue {
  347. // If test success, do set
  348. return s.internalSet(key, value, expireTime, index)
  349. } else {
  350. // If fails, return err
  351. return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s",
  352. resp.Value, prevValue))
  353. }
  354. }
  355. // Add a channel to the watchHub.
  356. // The watchHub will send response to the channel when any key under the prefix
  357. // changes [since the sinceIndex if given]
  358. func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
  359. return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap)
  360. }
  361. // This function should be created as a go routine to delete the key-value pair
  362. // when it reaches expiration time
  363. func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
  364. duration := expireTime.Sub(time.Now())
  365. for {
  366. select {
  367. // Timeout delete the node
  368. case <-time.After(duration):
  369. node, ok := s.Tree.get(key)
  370. if !ok {
  371. return
  372. } else {
  373. s.mutex.Lock()
  374. s.Tree.delete(key)
  375. resp := Response{
  376. Action: "DELETE",
  377. Key: key,
  378. PrevValue: node.Value,
  379. Expiration: &node.ExpireTime,
  380. Index: s.Index,
  381. }
  382. s.mutex.Unlock()
  383. msg, err := json.Marshal(resp)
  384. s.watcher.notify(resp)
  385. // notify the messager
  386. if s.messager != nil && err == nil {
  387. s.messager <- string(msg)
  388. }
  389. return
  390. }
  391. case updateTime := <-update:
  392. // Update duration
  393. // If the node become a permanent one, the go routine is
  394. // not needed
  395. if updateTime.Equal(PERMANENT) {
  396. return
  397. }
  398. // Update duration
  399. duration = updateTime.Sub(time.Now())
  400. }
  401. }
  402. }
  403. // When we receive a command that will change the state of the key-value store
  404. // We will add the result of it to the ResponseMap for the use of watch command
  405. // Also we may remove the oldest response when we add new one
  406. func (s *Store) addToResponseMap(index uint64, resp *Response) {
  407. // zero case
  408. if s.ResponseMaxSize == 0 {
  409. return
  410. }
  411. strIndex := strconv.FormatUint(index, 10)
  412. s.ResponseMap[strIndex] = resp
  413. // unlimited
  414. if s.ResponseMaxSize < 0 {
  415. s.ResponseCurrSize++
  416. return
  417. }
  418. // if we reach the max point, we need to delete the most latest
  419. // response and update the startIndex
  420. if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
  421. s.ResponseStartIndex++
  422. delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
  423. } else {
  424. s.ResponseCurrSize++
  425. }
  426. }
  427. func (s *Store) clone() *Store {
  428. newStore := &Store{
  429. ResponseMaxSize: s.ResponseMaxSize,
  430. ResponseCurrSize: s.ResponseCurrSize,
  431. ResponseStartIndex: s.ResponseStartIndex,
  432. Index: s.Index,
  433. BasicStats: s.BasicStats,
  434. }
  435. newStore.Tree = s.Tree.clone()
  436. newStore.ResponseMap = make(map[string]*Response)
  437. for index, response := range s.ResponseMap {
  438. newStore.ResponseMap[index] = response
  439. }
  440. return newStore
  441. }
  442. // Save the current state of the storage system
  443. func (s *Store) Save() ([]byte, error) {
  444. // first we clone the store
  445. // json is very slow, we cannot hold the lock for such a long time
  446. s.mutex.Lock()
  447. cloneStore := s.clone()
  448. s.mutex.Unlock()
  449. b, err := json.Marshal(cloneStore)
  450. if err != nil {
  451. fmt.Println(err)
  452. return nil, err
  453. }
  454. return b, nil
  455. }
  456. // Recovery the state of the stroage system from a previous state
  457. func (s *Store) Recovery(state []byte) error {
  458. s.mutex.Lock()
  459. defer s.mutex.Unlock()
  460. // we need to stop all the current watchers
  461. // recovery will clear watcherHub
  462. s.watcher.stopWatchers()
  463. err := json.Unmarshal(state, s)
  464. // The only thing need to change after the recovery is the
  465. // node with expiration time, we need to delete all the node
  466. // that have been expired and setup go routines to monitor the
  467. // other ones
  468. s.checkExpiration()
  469. return err
  470. }
  471. // Clean the expired nodes
  472. // Set up go routines to mon
  473. func (s *Store) checkExpiration() {
  474. s.Tree.traverse(s.checkNode, false)
  475. }
  476. // Check each node
  477. func (s *Store) checkNode(key string, node *Node) {
  478. if node.ExpireTime.Equal(PERMANENT) {
  479. return
  480. } else {
  481. if node.ExpireTime.Sub(time.Now()) >= time.Second {
  482. node.update = make(chan time.Time)
  483. go s.monitorExpiration(key, node.update, node.ExpireTime)
  484. } else {
  485. // we should delete this node
  486. s.Tree.delete(key)
  487. }
  488. }
  489. }