store.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. etcdErr "github.com/coreos/etcd/error"
  6. "path"
  7. "strconv"
  8. "sync"
  9. "time"
  10. )
  11. //------------------------------------------------------------------------------
  12. //
  13. // Typedefs
  14. //
  15. //------------------------------------------------------------------------------
  16. // The main struct of the Key-Value store
  17. type Store struct {
  18. // key-value store structure
  19. Tree *tree
  20. // This mutex protects everything except add watcher member.
  21. // Add watch member does not depend on the current state of the store.
  22. // And watch will return when other protected function is called and reach
  23. // the watching condition.
  24. // It is needed so that clone() can atomically replicate the Store
  25. // and do the log snapshot in a go routine.
  26. mutex sync.RWMutex
  27. // WatcherHub is where we register all the clients
  28. // who issue a watch request
  29. watcher *WatcherHub
  30. // The string channel to send messages to the outside world
  31. // Now we use it to send changes to the hub of the web service
  32. messager chan<- string
  33. // A map to keep the recent response to the clients
  34. ResponseMap map[string]*Response
  35. // The max number of the recent responses we can record
  36. ResponseMaxSize int
  37. // The current number of the recent responses we have recorded
  38. ResponseCurrSize uint
  39. // The index of the first recent responses we have
  40. ResponseStartIndex uint64
  41. // Current index of the raft machine
  42. Index uint64
  43. // Basic statistics information of etcd storage
  44. BasicStats EtcdStats
  45. }
  46. // A Node represents a Value in the Key-Value pair in the store
  47. // It has its value, expire time and a channel used to update the
  48. // expire time (since we do countdown in a go routine, we need to
  49. // communicate with it via channel)
  50. type Node struct {
  51. // The string value of the node
  52. Value string `json:"value"`
  53. // If the node is a permanent one the ExprieTime will be Unix(0,0)
  54. // Otherwise after the expireTime, the node will be deleted
  55. ExpireTime time.Time `json:"expireTime"`
  56. // A channel to update the expireTime of the node
  57. update chan time.Time `json:"-"`
  58. }
  59. // The response from the store to the user who issue a command
  60. type Response struct {
  61. Action string `json:"action"`
  62. Key string `json:"key"`
  63. Dir bool `json:"dir,omitempty"`
  64. PrevValue string `json:"prevValue,omitempty"`
  65. Value string `json:"value,omitempty"`
  66. // If the key did not exist before the action,
  67. // this field should be set to true
  68. NewKey bool `json:"newKey,omitempty"`
  69. Expiration *time.Time `json:"expiration,omitempty"`
  70. // Time to live in second
  71. TTL int64 `json:"ttl,omitempty"`
  72. // The command index of the raft machine when the command is executed
  73. Index uint64 `json:"index"`
  74. }
  75. // A listNode represent the simplest Key-Value pair with its type
  76. // It is only used when do list opeartion
  77. // We want to have a file system like store, thus we distingush "file"
  78. // and "directory"
  79. type ListNode struct {
  80. Key string
  81. Value string
  82. Type string
  83. }
  84. var PERMANENT = time.Unix(0, 0)
  85. //------------------------------------------------------------------------------
  86. //
  87. // Methods
  88. //
  89. //------------------------------------------------------------------------------
  90. // Create a new stroe
  91. // Arguement max is the max number of response we want to record
  92. func CreateStore(max int) *Store {
  93. s := new(Store)
  94. s.messager = nil
  95. s.ResponseMap = make(map[string]*Response)
  96. s.ResponseStartIndex = 0
  97. s.ResponseMaxSize = max
  98. s.ResponseCurrSize = 0
  99. s.Tree = &tree{
  100. &treeNode{
  101. Node{
  102. "/",
  103. time.Unix(0, 0),
  104. nil,
  105. },
  106. true,
  107. make(map[string]*treeNode),
  108. },
  109. }
  110. s.watcher = newWatcherHub()
  111. return s
  112. }
  113. // Set the messager of the store
  114. func (s *Store) SetMessager(messager chan<- string) {
  115. s.messager = messager
  116. }
  117. func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  118. s.mutex.Lock()
  119. defer s.mutex.Unlock()
  120. return s.internalSet(key, value, expireTime, index)
  121. }
  122. // Set the key to value with expiration time
  123. func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  124. //Update index
  125. s.Index = index
  126. //Update stats
  127. s.BasicStats.Sets++
  128. key = path.Clean("/" + key)
  129. isExpire := !expireTime.Equal(PERMANENT)
  130. // base response
  131. resp := Response{
  132. Action: "SET",
  133. Key: key,
  134. Value: value,
  135. Index: index,
  136. }
  137. // When the slow follower receive the set command
  138. // the key may be expired, we should not add the node
  139. // also if the node exist, we need to delete the node
  140. if isExpire && expireTime.Sub(time.Now()) < 0 {
  141. return s.internalDelete(key, index)
  142. }
  143. var TTL int64
  144. // Update ttl
  145. if isExpire {
  146. TTL = int64(expireTime.Sub(time.Now()) / time.Second)
  147. resp.Expiration = &expireTime
  148. resp.TTL = TTL
  149. }
  150. // Get the node
  151. node, ok := s.Tree.get(key)
  152. if ok {
  153. // Update when node exists
  154. // Node is not permanent
  155. if !node.ExpireTime.Equal(PERMANENT) {
  156. // If node is not permanent
  157. // Update its expireTime
  158. node.update <- expireTime
  159. } else {
  160. // If we want the permanent node to have expire time
  161. // We need to create a go routine with a channel
  162. if isExpire {
  163. node.update = make(chan time.Time)
  164. go s.monitorExpiration(key, node.update, expireTime)
  165. }
  166. }
  167. // Update the information of the node
  168. s.Tree.set(key, Node{value, expireTime, node.update})
  169. resp.PrevValue = node.Value
  170. s.watcher.notify(resp)
  171. msg, err := json.Marshal(resp)
  172. // Send to the messager
  173. if s.messager != nil && err == nil {
  174. s.messager <- string(msg)
  175. }
  176. s.addToResponseMap(index, &resp)
  177. return msg, err
  178. // Add new node
  179. } else {
  180. update := make(chan time.Time)
  181. ok := s.Tree.set(key, Node{value, expireTime, update})
  182. if !ok {
  183. return nil, etcdErr.NewError(102, "set: "+key)
  184. }
  185. if isExpire {
  186. go s.monitorExpiration(key, update, expireTime)
  187. }
  188. resp.NewKey = true
  189. msg, err := json.Marshal(resp)
  190. // Nofity the watcher
  191. s.watcher.notify(resp)
  192. // Send to the messager
  193. if s.messager != nil && err == nil {
  194. s.messager <- string(msg)
  195. }
  196. s.addToResponseMap(index, &resp)
  197. return msg, err
  198. }
  199. }
  200. // Get the value of the key and return the raw response
  201. func (s *Store) internalGet(key string) *Response {
  202. key = path.Clean("/" + key)
  203. node, ok := s.Tree.get(key)
  204. if ok {
  205. var TTL int64
  206. var isExpire bool = false
  207. isExpire = !node.ExpireTime.Equal(PERMANENT)
  208. resp := &Response{
  209. Action: "GET",
  210. Key: key,
  211. Value: node.Value,
  212. Index: s.Index,
  213. }
  214. // Update ttl
  215. if isExpire {
  216. TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  217. resp.Expiration = &node.ExpireTime
  218. resp.TTL = TTL
  219. }
  220. return resp
  221. } else {
  222. // we do not found the key
  223. return nil
  224. }
  225. }
  226. // Get all the items under key
  227. // If key is a file return the file
  228. // If key is a directory reuturn an array of files
  229. func (s *Store) Get(key string) ([]byte, error) {
  230. s.mutex.RLock()
  231. defer s.mutex.RUnlock()
  232. resps, err := s.RawGet(key)
  233. if err != nil {
  234. return nil, err
  235. }
  236. key = path.Clean("/" + key)
  237. // If the number of resps == 1 and the response key
  238. // is the key we query, a signal key-value should
  239. // be returned
  240. if len(resps) == 1 && resps[0].Key == key {
  241. return json.Marshal(resps[0])
  242. }
  243. return json.Marshal(resps)
  244. }
  245. func (s *Store) rawGetNode(key string, node *Node) ([]*Response, error) {
  246. resps := make([]*Response, 1)
  247. isExpire := !node.ExpireTime.Equal(PERMANENT)
  248. resps[0] = &Response{
  249. Action: "GET",
  250. Index: s.Index,
  251. Key: key,
  252. Value: node.Value,
  253. }
  254. // Update ttl
  255. if isExpire {
  256. TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  257. resps[0].Expiration = &node.ExpireTime
  258. resps[0].TTL = TTL
  259. }
  260. return resps, nil
  261. }
  262. func (s *Store) rawGetNodeList(key string, keys []string, nodes []*Node) ([]*Response, error) {
  263. resps := make([]*Response, len(nodes))
  264. // TODO: check if nodes and keys are the same length
  265. for i := 0; i < len(nodes); i++ {
  266. var TTL int64
  267. var isExpire bool = false
  268. isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
  269. resps[i] = &Response{
  270. Action: "GET",
  271. Index: s.Index,
  272. Key: path.Join(key, keys[i]),
  273. }
  274. if len(nodes[i].Value) != 0 {
  275. resps[i].Value = nodes[i].Value
  276. } else {
  277. resps[i].Dir = true
  278. }
  279. // Update ttl
  280. if isExpire {
  281. TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second)
  282. resps[i].Expiration = &nodes[i].ExpireTime
  283. resps[i].TTL = TTL
  284. }
  285. }
  286. return resps, nil
  287. }
  288. func (s *Store) RawGet(key string) ([]*Response, error) {
  289. // Update stats
  290. s.BasicStats.Gets++
  291. key = path.Clean("/" + key)
  292. nodes, keys, ok := s.Tree.list(key)
  293. if !ok {
  294. return nil, etcdErr.NewError(100, "get: "+key)
  295. }
  296. switch node := nodes.(type) {
  297. case *Node:
  298. return s.rawGetNode(key, node)
  299. case []*Node:
  300. return s.rawGetNodeList(key, keys, node)
  301. default:
  302. panic("invalid cast ")
  303. }
  304. }
  305. func (s *Store) Delete(key string, index uint64) ([]byte, error) {
  306. s.mutex.Lock()
  307. defer s.mutex.Unlock()
  308. return s.internalDelete(key, index)
  309. }
  310. // Delete the key
  311. func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
  312. // Update stats
  313. s.BasicStats.Deletes++
  314. key = path.Clean("/" + key)
  315. // Update index
  316. s.Index = index
  317. node, ok := s.Tree.get(key)
  318. if !ok {
  319. return nil, etcdErr.NewError(100, "delete: "+key)
  320. }
  321. resp := Response{
  322. Action: "DELETE",
  323. Key: key,
  324. PrevValue: node.Value,
  325. Index: index,
  326. }
  327. if node.ExpireTime.Equal(PERMANENT) {
  328. s.Tree.delete(key)
  329. } else {
  330. resp.Expiration = &node.ExpireTime
  331. // Kill the expire go routine
  332. node.update <- PERMANENT
  333. s.Tree.delete(key)
  334. }
  335. msg, err := json.Marshal(resp)
  336. s.watcher.notify(resp)
  337. // notify the messager
  338. if s.messager != nil && err == nil {
  339. s.messager <- string(msg)
  340. }
  341. s.addToResponseMap(index, &resp)
  342. return msg, err
  343. }
  344. // Set the value of the key to the value if the given prevValue is equal to the value of the key
  345. func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  346. s.mutex.Lock()
  347. defer s.mutex.Unlock()
  348. // Update stats
  349. s.BasicStats.TestAndSets++
  350. resp := s.internalGet(key)
  351. if resp == nil {
  352. if prevValue != "" {
  353. errmsg := fmt.Sprintf("TestAndSet: key not found and previousValue is not empty %s:%s ", key, prevValue)
  354. return nil, etcdErr.NewError(100, errmsg)
  355. }
  356. return s.internalSet(key, value, expireTime, index)
  357. }
  358. if resp.Value == prevValue {
  359. // If test succeed, do set
  360. return s.internalSet(key, value, expireTime, index)
  361. } else {
  362. // If fails, return err
  363. return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s",
  364. resp.Value, prevValue))
  365. }
  366. }
  367. // Add a channel to the watchHub.
  368. // The watchHub will send response to the channel when any key under the prefix
  369. // changes [since the sinceIndex if given]
  370. func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
  371. return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap)
  372. }
  373. // This function should be created as a go routine to delete the key-value pair
  374. // when it reaches expiration time
  375. func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
  376. duration := expireTime.Sub(time.Now())
  377. for {
  378. select {
  379. // Timeout delete the node
  380. case <-time.After(duration):
  381. node, ok := s.Tree.get(key)
  382. if !ok {
  383. return
  384. } else {
  385. s.mutex.Lock()
  386. s.Tree.delete(key)
  387. resp := Response{
  388. Action: "DELETE",
  389. Key: key,
  390. PrevValue: node.Value,
  391. Expiration: &node.ExpireTime,
  392. Index: s.Index,
  393. }
  394. s.mutex.Unlock()
  395. msg, err := json.Marshal(resp)
  396. s.watcher.notify(resp)
  397. // notify the messager
  398. if s.messager != nil && err == nil {
  399. s.messager <- string(msg)
  400. }
  401. return
  402. }
  403. case updateTime := <-update:
  404. // Update duration
  405. // If the node become a permanent one, the go routine is
  406. // not needed
  407. if updateTime.Equal(PERMANENT) {
  408. return
  409. }
  410. // Update duration
  411. duration = updateTime.Sub(time.Now())
  412. }
  413. }
  414. }
  415. // When we receive a command that will change the state of the key-value store
  416. // We will add the result of it to the ResponseMap for the use of watch command
  417. // Also we may remove the oldest response when we add new one
  418. func (s *Store) addToResponseMap(index uint64, resp *Response) {
  419. // zero case
  420. if s.ResponseMaxSize == 0 {
  421. return
  422. }
  423. strIndex := strconv.FormatUint(index, 10)
  424. s.ResponseMap[strIndex] = resp
  425. // unlimited
  426. if s.ResponseMaxSize < 0 {
  427. s.ResponseCurrSize++
  428. return
  429. }
  430. // if we reach the max point, we need to delete the most latest
  431. // response and update the startIndex
  432. if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
  433. s.ResponseStartIndex++
  434. delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
  435. } else {
  436. s.ResponseCurrSize++
  437. }
  438. }
  439. func (s *Store) clone() *Store {
  440. newStore := &Store{
  441. ResponseMaxSize: s.ResponseMaxSize,
  442. ResponseCurrSize: s.ResponseCurrSize,
  443. ResponseStartIndex: s.ResponseStartIndex,
  444. Index: s.Index,
  445. BasicStats: s.BasicStats,
  446. }
  447. newStore.Tree = s.Tree.clone()
  448. newStore.ResponseMap = make(map[string]*Response)
  449. for index, response := range s.ResponseMap {
  450. newStore.ResponseMap[index] = response
  451. }
  452. return newStore
  453. }
  454. // Save the current state of the storage system
  455. func (s *Store) Save() ([]byte, error) {
  456. // first we clone the store
  457. // json is very slow, we cannot hold the lock for such a long time
  458. s.mutex.Lock()
  459. cloneStore := s.clone()
  460. s.mutex.Unlock()
  461. b, err := json.Marshal(cloneStore)
  462. if err != nil {
  463. fmt.Println(err)
  464. return nil, err
  465. }
  466. return b, nil
  467. }
  468. // Recovery the state of the stroage system from a previous state
  469. func (s *Store) Recovery(state []byte) error {
  470. s.mutex.Lock()
  471. defer s.mutex.Unlock()
  472. // we need to stop all the current watchers
  473. // recovery will clear watcherHub
  474. s.watcher.stopWatchers()
  475. err := json.Unmarshal(state, s)
  476. // The only thing need to change after the recovery is the
  477. // node with expiration time, we need to delete all the node
  478. // that have been expired and setup go routines to monitor the
  479. // other ones
  480. s.checkExpiration()
  481. return err
  482. }
  483. // Clean the expired nodes
  484. // Set up go routines to mon
  485. func (s *Store) checkExpiration() {
  486. s.Tree.traverse(s.checkNode, false)
  487. }
  488. // Check each node
  489. func (s *Store) checkNode(key string, node *Node) {
  490. if node.ExpireTime.Equal(PERMANENT) {
  491. return
  492. } else {
  493. if node.ExpireTime.Sub(time.Now()) >= time.Second {
  494. node.update = make(chan time.Time)
  495. go s.monitorExpiration(key, node.update, node.ExpireTime)
  496. } else {
  497. // we should delete this node
  498. s.Tree.delete(key)
  499. }
  500. }
  501. }