store.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "path"
  18. "strconv"
  19. "sync"
  20. "time"
  21. etcdErr "github.com/coreos/etcd/error"
  22. )
  23. //------------------------------------------------------------------------------
  24. //
  25. // Typedefs
  26. //
  27. //------------------------------------------------------------------------------
  28. // The main struct of the Key-Value store
  29. type Store struct {
  30. // key-value store structure
  31. Tree *tree
  32. // This mutex protects everything except add watcher member.
  33. // Add watch member does not depend on the current state of the store.
  34. // And watch will return when other protected function is called and reach
  35. // the watching condition.
  36. // It is needed so that clone() can atomically replicate the Store
  37. // and do the log snapshot in a go routine.
  38. mutex sync.RWMutex
  39. // WatcherHub is where we register all the clients
  40. // who issue a watch request
  41. watcher *WatcherHub
  42. // The string channel to send messages to the outside world
  43. // Now we use it to send changes to the hub of the web service
  44. messager chan<- string
  45. // A map to keep the recent response to the clients
  46. ResponseMap map[string]*Response
  47. // The max number of the recent responses we can record
  48. ResponseMaxSize int
  49. // The current number of the recent responses we have recorded
  50. ResponseCurrSize uint
  51. // The index of the first recent responses we have
  52. ResponseStartIndex uint64
  53. // Current index of the raft machine
  54. Index uint64
  55. // Basic statistics information of etcd storage
  56. BasicStats EtcdStats
  57. }
  58. // A Node represents a Value in the Key-Value pair in the store
  59. // It has its value, expire time and a channel used to update the
  60. // expire time (since we do countdown in a go routine, we need to
  61. // communicate with it via channel)
  62. type Node struct {
  63. // The string value of the node
  64. Value string `json:"value"`
  65. // If the node is a permanent one the ExprieTime will be Unix(0,0)
  66. // Otherwise after the expireTime, the node will be deleted
  67. ExpireTime time.Time `json:"expireTime"`
  68. // A channel to update the expireTime of the node
  69. update chan time.Time `json:"-"`
  70. }
  71. // The response from the store to the user who issue a command
  72. type Response struct {
  73. Action string `json:"action"`
  74. Key string `json:"key"`
  75. Dir bool `json:"dir,omitempty"`
  76. PrevValue string `json:"prevValue,omitempty"`
  77. Value string `json:"value,omitempty"`
  78. // If the key did not exist before the action,
  79. // this field should be set to true
  80. NewKey bool `json:"newKey,omitempty"`
  81. Expiration *time.Time `json:"expiration,omitempty"`
  82. // Time to live in second
  83. TTL int64 `json:"ttl,omitempty"`
  84. // The command index of the raft machine when the command is executed
  85. Index uint64 `json:"index"`
  86. }
  87. // A listNode represent the simplest Key-Value pair with its type
  88. // It is only used when do list opeartion
  89. // We want to have a file system like store, thus we distingush "file"
  90. // and "directory"
  91. type ListNode struct {
  92. Key string
  93. Value string
  94. Type string
  95. }
  96. var PERMANENT = time.Unix(0, 0)
  97. //------------------------------------------------------------------------------
  98. //
  99. // Methods
  100. //
  101. //------------------------------------------------------------------------------
  102. // Create a new stroe
  103. // Arguement max is the max number of response we want to record
  104. func CreateStore(max int) *Store {
  105. s := new(Store)
  106. s.messager = nil
  107. s.ResponseMap = make(map[string]*Response)
  108. s.ResponseStartIndex = 0
  109. s.ResponseMaxSize = max
  110. s.ResponseCurrSize = 0
  111. s.Tree = &tree{
  112. &treeNode{
  113. Node{
  114. "/",
  115. time.Unix(0, 0),
  116. nil,
  117. },
  118. true,
  119. make(map[string]*treeNode),
  120. },
  121. }
  122. s.watcher = newWatcherHub()
  123. return s
  124. }
  125. // Set the messager of the store
  126. func (s *Store) SetMessager(messager chan<- string) {
  127. s.messager = messager
  128. }
  129. func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  130. s.mutex.Lock()
  131. defer s.mutex.Unlock()
  132. return s.internalSet(key, value, expireTime, index)
  133. }
  134. // Set the key to value with expiration time
  135. func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  136. //Update index
  137. s.Index = index
  138. //Update stats
  139. s.BasicStats.Sets++
  140. key = path.Clean("/" + key)
  141. isExpire := !expireTime.Equal(PERMANENT)
  142. // base response
  143. resp := Response{
  144. Action: "SET",
  145. Key: key,
  146. Value: value,
  147. Index: index,
  148. }
  149. // When the slow follower receive the set command
  150. // the key may be expired, we should not add the node
  151. // also if the node exist, we need to delete the node
  152. if isExpire && expireTime.Sub(time.Now()) < 0 {
  153. return s.internalDelete(key, index)
  154. }
  155. var TTL int64
  156. // Update ttl
  157. if isExpire {
  158. TTL = int64(expireTime.Sub(time.Now()) / time.Second)
  159. resp.Expiration = &expireTime
  160. resp.TTL = TTL
  161. }
  162. // Get the node
  163. node, ok := s.Tree.get(key)
  164. if ok {
  165. // Update when node exists
  166. // Node is not permanent
  167. if !node.ExpireTime.Equal(PERMANENT) {
  168. // If node is not permanent
  169. // Update its expireTime
  170. node.update <- expireTime
  171. } else {
  172. // If we want the permanent node to have expire time
  173. // We need to create a go routine with a channel
  174. if isExpire {
  175. node.update = make(chan time.Time)
  176. go s.monitorExpiration(key, node.update, expireTime)
  177. }
  178. }
  179. // Update the information of the node
  180. s.Tree.set(key, Node{value, expireTime, node.update})
  181. resp.PrevValue = node.Value
  182. s.watcher.notify(resp)
  183. msg, err := json.Marshal(resp)
  184. // Send to the messager
  185. if s.messager != nil && err == nil {
  186. s.messager <- string(msg)
  187. }
  188. s.addToResponseMap(index, &resp)
  189. return msg, err
  190. // Add new node
  191. } else {
  192. update := make(chan time.Time)
  193. ok := s.Tree.set(key, Node{value, expireTime, update})
  194. if !ok {
  195. return nil, etcdErr.NewError(102, "set: "+key)
  196. }
  197. if isExpire {
  198. go s.monitorExpiration(key, update, expireTime)
  199. }
  200. resp.NewKey = true
  201. msg, err := json.Marshal(resp)
  202. // Nofity the watcher
  203. s.watcher.notify(resp)
  204. // Send to the messager
  205. if s.messager != nil && err == nil {
  206. s.messager <- string(msg)
  207. }
  208. s.addToResponseMap(index, &resp)
  209. return msg, err
  210. }
  211. }
  212. // Get the value of the key and return the raw response
  213. func (s *Store) internalGet(key string) *Response {
  214. key = path.Clean("/" + key)
  215. node, ok := s.Tree.get(key)
  216. if ok {
  217. var TTL int64
  218. var isExpire bool = false
  219. isExpire = !node.ExpireTime.Equal(PERMANENT)
  220. resp := &Response{
  221. Action: "GET",
  222. Key: key,
  223. Value: node.Value,
  224. Index: s.Index,
  225. }
  226. // Update ttl
  227. if isExpire {
  228. TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  229. resp.Expiration = &node.ExpireTime
  230. resp.TTL = TTL
  231. }
  232. return resp
  233. } else {
  234. // we do not found the key
  235. return nil
  236. }
  237. }
  238. // Get all the items under key
  239. // If key is a file return the file
  240. // If key is a directory reuturn an array of files
  241. func (s *Store) Get(key string) ([]byte, error) {
  242. s.mutex.RLock()
  243. defer s.mutex.RUnlock()
  244. resps, err := s.RawGet(key)
  245. if err != nil {
  246. return nil, err
  247. }
  248. key = path.Clean("/" + key)
  249. // If the number of resps == 1 and the response key
  250. // is the key we query, a signal key-value should
  251. // be returned
  252. if len(resps) == 1 && resps[0].Key == key {
  253. return json.Marshal(resps[0])
  254. }
  255. return json.Marshal(resps)
  256. }
  257. func (s *Store) rawGetNode(key string, node *Node) ([]*Response, error) {
  258. resps := make([]*Response, 1)
  259. isExpire := !node.ExpireTime.Equal(PERMANENT)
  260. resps[0] = &Response{
  261. Action: "GET",
  262. Index: s.Index,
  263. Key: key,
  264. Value: node.Value,
  265. }
  266. // Update ttl
  267. if isExpire {
  268. TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second)
  269. resps[0].Expiration = &node.ExpireTime
  270. resps[0].TTL = TTL
  271. }
  272. return resps, nil
  273. }
  274. func (s *Store) rawGetNodeList(key string, keys []string, nodes []*Node) ([]*Response, error) {
  275. resps := make([]*Response, len(nodes))
  276. // TODO: check if nodes and keys are the same length
  277. for i := 0; i < len(nodes); i++ {
  278. var TTL int64
  279. var isExpire bool = false
  280. isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
  281. resps[i] = &Response{
  282. Action: "GET",
  283. Index: s.Index,
  284. Key: path.Join(key, keys[i]),
  285. }
  286. if len(nodes[i].Value) != 0 {
  287. resps[i].Value = nodes[i].Value
  288. } else {
  289. resps[i].Dir = true
  290. }
  291. // Update ttl
  292. if isExpire {
  293. TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second)
  294. resps[i].Expiration = &nodes[i].ExpireTime
  295. resps[i].TTL = TTL
  296. }
  297. }
  298. return resps, nil
  299. }
  300. func (s *Store) RawGet(key string) ([]*Response, error) {
  301. // Update stats
  302. s.BasicStats.Gets++
  303. key = path.Clean("/" + key)
  304. nodes, keys, ok := s.Tree.list(key)
  305. if !ok {
  306. return nil, etcdErr.NewError(100, "get: "+key)
  307. }
  308. switch node := nodes.(type) {
  309. case *Node:
  310. return s.rawGetNode(key, node)
  311. case []*Node:
  312. return s.rawGetNodeList(key, keys, node)
  313. default:
  314. panic("invalid cast ")
  315. }
  316. }
  317. func (s *Store) Delete(key string, index uint64) ([]byte, error) {
  318. s.mutex.Lock()
  319. defer s.mutex.Unlock()
  320. return s.internalDelete(key, index)
  321. }
  322. // Delete the key
  323. func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
  324. // Update stats
  325. s.BasicStats.Deletes++
  326. key = path.Clean("/" + key)
  327. // Update index
  328. s.Index = index
  329. node, ok := s.Tree.get(key)
  330. if !ok {
  331. return nil, etcdErr.NewError(100, "delete: "+key)
  332. }
  333. resp := Response{
  334. Action: "DELETE",
  335. Key: key,
  336. PrevValue: node.Value,
  337. Index: index,
  338. }
  339. if node.ExpireTime.Equal(PERMANENT) {
  340. s.Tree.delete(key)
  341. } else {
  342. resp.Expiration = &node.ExpireTime
  343. // Kill the expire go routine
  344. node.update <- PERMANENT
  345. s.Tree.delete(key)
  346. }
  347. msg, err := json.Marshal(resp)
  348. s.watcher.notify(resp)
  349. // notify the messager
  350. if s.messager != nil && err == nil {
  351. s.messager <- string(msg)
  352. }
  353. s.addToResponseMap(index, &resp)
  354. return msg, err
  355. }
  356. // Set the value of the key to the value if the given prevValue is equal to the value of the key
  357. func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
  358. s.mutex.Lock()
  359. defer s.mutex.Unlock()
  360. // Update stats
  361. s.BasicStats.TestAndSets++
  362. resp := s.internalGet(key)
  363. if resp == nil {
  364. if prevValue != "" {
  365. errmsg := fmt.Sprintf("TestAndSet: key not found and previousValue is not empty %s:%s ", key, prevValue)
  366. return nil, etcdErr.NewError(100, errmsg)
  367. }
  368. return s.internalSet(key, value, expireTime, index)
  369. }
  370. if resp.Value == prevValue {
  371. // If test succeed, do set
  372. return s.internalSet(key, value, expireTime, index)
  373. } else {
  374. // If fails, return err
  375. return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s",
  376. resp.Value, prevValue))
  377. }
  378. }
  379. // Add a channel to the watchHub.
  380. // The watchHub will send response to the channel when any key under the prefix
  381. // changes [since the sinceIndex if given]
  382. func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
  383. return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap)
  384. }
  385. // This function should be created as a go routine to delete the key-value pair
  386. // when it reaches expiration time
  387. func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
  388. duration := expireTime.Sub(time.Now())
  389. for {
  390. select {
  391. // Timeout delete the node
  392. case <-time.After(duration):
  393. node, ok := s.Tree.get(key)
  394. if !ok {
  395. return
  396. } else {
  397. s.mutex.Lock()
  398. s.Tree.delete(key)
  399. resp := Response{
  400. Action: "DELETE",
  401. Key: key,
  402. PrevValue: node.Value,
  403. Expiration: &node.ExpireTime,
  404. Index: s.Index,
  405. }
  406. s.mutex.Unlock()
  407. msg, err := json.Marshal(resp)
  408. s.watcher.notify(resp)
  409. // notify the messager
  410. if s.messager != nil && err == nil {
  411. s.messager <- string(msg)
  412. }
  413. return
  414. }
  415. case updateTime := <-update:
  416. // Update duration
  417. // If the node become a permanent one, the go routine is
  418. // not needed
  419. if updateTime.Equal(PERMANENT) {
  420. return
  421. }
  422. // Update duration
  423. duration = updateTime.Sub(time.Now())
  424. }
  425. }
  426. }
  427. // When we receive a command that will change the state of the key-value store
  428. // We will add the result of it to the ResponseMap for the use of watch command
  429. // Also we may remove the oldest response when we add new one
  430. func (s *Store) addToResponseMap(index uint64, resp *Response) {
  431. // zero case
  432. if s.ResponseMaxSize == 0 {
  433. return
  434. }
  435. strIndex := strconv.FormatUint(index, 10)
  436. s.ResponseMap[strIndex] = resp
  437. // unlimited
  438. if s.ResponseMaxSize < 0 {
  439. s.ResponseCurrSize++
  440. return
  441. }
  442. // if we reach the max point, we need to delete the most latest
  443. // response and update the startIndex
  444. if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
  445. s.ResponseStartIndex++
  446. delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
  447. } else {
  448. s.ResponseCurrSize++
  449. }
  450. }
  451. func (s *Store) clone() *Store {
  452. newStore := &Store{
  453. ResponseMaxSize: s.ResponseMaxSize,
  454. ResponseCurrSize: s.ResponseCurrSize,
  455. ResponseStartIndex: s.ResponseStartIndex,
  456. Index: s.Index,
  457. BasicStats: s.BasicStats,
  458. }
  459. newStore.Tree = s.Tree.clone()
  460. newStore.ResponseMap = make(map[string]*Response)
  461. for index, response := range s.ResponseMap {
  462. newStore.ResponseMap[index] = response
  463. }
  464. return newStore
  465. }
  466. // Save the current state of the storage system
  467. func (s *Store) Save() ([]byte, error) {
  468. // first we clone the store
  469. // json is very slow, we cannot hold the lock for such a long time
  470. s.mutex.Lock()
  471. cloneStore := s.clone()
  472. s.mutex.Unlock()
  473. b, err := json.Marshal(cloneStore)
  474. if err != nil {
  475. fmt.Println(err)
  476. return nil, err
  477. }
  478. return b, nil
  479. }
  480. // Recovery the state of the stroage system from a previous state
  481. func (s *Store) Recovery(state []byte) error {
  482. s.mutex.Lock()
  483. defer s.mutex.Unlock()
  484. // we need to stop all the current watchers
  485. // recovery will clear watcherHub
  486. s.watcher.stopWatchers()
  487. err := json.Unmarshal(state, s)
  488. // The only thing need to change after the recovery is the
  489. // node with expiration time, we need to delete all the node
  490. // that have been expired and setup go routines to monitor the
  491. // other ones
  492. s.checkExpiration()
  493. return err
  494. }
  495. // Clean the expired nodes
  496. // Set up go routines to mon
  497. func (s *Store) checkExpiration() {
  498. s.Tree.traverse(s.checkNode, false)
  499. }
  500. // Check each node
  501. func (s *Store) checkNode(key string, node *Node) {
  502. if node.ExpireTime.Equal(PERMANENT) {
  503. return
  504. } else {
  505. if node.ExpireTime.Sub(time.Now()) >= time.Second {
  506. node.update = make(chan time.Time)
  507. go s.monitorExpiration(key, node.update, node.ExpireTime)
  508. } else {
  509. // we should delete this node
  510. s.Tree.delete(key)
  511. }
  512. }
  513. }