store.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "path"
  18. "sort"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. etcdErr "github.com/coreos/etcd/error"
  24. )
  25. // The default version to set when the store is first initialized.
  26. const defaultVersion = 2
  27. var minExpireTime time.Time
  28. func init() {
  29. minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
  30. }
  31. type Store interface {
  32. Version() int
  33. CommandFactory() CommandFactory
  34. Index() uint64
  35. Get(nodePath string, recursive, sorted bool) (*Event, error)
  36. Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
  37. Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
  38. Create(nodePath string, dir bool, value string, unique bool,
  39. expireTime time.Time) (*Event, error)
  40. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  41. value string, expireTime time.Time) (*Event, error)
  42. Delete(nodePath string, recursive, dir bool) (*Event, error)
  43. CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  44. Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error)
  45. Save() ([]byte, error)
  46. Recovery(state []byte) error
  47. TotalTransactions() uint64
  48. JsonStats() []byte
  49. DeleteExpiredKeys(cutoff time.Time)
  50. }
  51. type store struct {
  52. Root *node
  53. WatcherHub *watcherHub
  54. CurrentIndex uint64
  55. Stats *Stats
  56. CurrentVersion int
  57. ttlKeyHeap *ttlKeyHeap // need to recovery manually
  58. worldLock sync.RWMutex // stop the world lock
  59. }
  60. func New() Store {
  61. return newStore()
  62. }
  63. func newStore() *store {
  64. s := new(store)
  65. s.CurrentVersion = defaultVersion
  66. s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
  67. s.Stats = newStats()
  68. s.WatcherHub = newWatchHub(1000)
  69. s.ttlKeyHeap = newTtlKeyHeap()
  70. return s
  71. }
  72. // Version retrieves current version of the store.
  73. func (s *store) Version() int {
  74. return s.CurrentVersion
  75. }
  76. // Retrieves current of the store
  77. func (s *store) Index() uint64 {
  78. return s.CurrentIndex
  79. }
  80. // CommandFactory retrieves the command factory for the current version of the store.
  81. func (s *store) CommandFactory() CommandFactory {
  82. return GetCommandFactory(s.Version())
  83. }
  84. // Get function returns a get event.
  85. // If recursive is true, it will return all the content under the node path.
  86. // If sorted is true, it will sort the content by keys.
  87. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
  88. s.worldLock.RLock()
  89. defer s.worldLock.RUnlock()
  90. nodePath = path.Clean(path.Join("/", nodePath))
  91. n, err := s.internalGet(nodePath)
  92. if err != nil {
  93. s.Stats.Inc(GetFail)
  94. return nil, err
  95. }
  96. e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  97. eNode := e.Node
  98. if n.IsDir() { // node is a directory
  99. eNode.Dir = true
  100. children, _ := n.List()
  101. eNode.Nodes = make(NodeExterns, len(children))
  102. // we do not use the index in the children slice directly
  103. // we need to skip the hidden one
  104. i := 0
  105. for _, child := range children {
  106. if child.IsHidden() { // get will not return hidden nodes
  107. continue
  108. }
  109. eNode.Nodes[i] = child.Repr(recursive, sorted)
  110. i++
  111. }
  112. // eliminate hidden nodes
  113. eNode.Nodes = eNode.Nodes[:i]
  114. if sorted {
  115. sort.Sort(eNode.Nodes)
  116. }
  117. } else { // node is a file
  118. eNode.Value, _ = n.Read()
  119. }
  120. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  121. s.Stats.Inc(GetSuccess)
  122. return e, nil
  123. }
  124. // Create function creates the node at nodePath. Create will help to create intermediate directories with no ttl.
  125. // If the node has already existed, create will fail.
  126. // If any node on the path is a file, create will fail.
  127. func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
  128. s.worldLock.Lock()
  129. defer s.worldLock.Unlock()
  130. e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
  131. if err == nil {
  132. s.Stats.Inc(CreateSuccess)
  133. } else {
  134. s.Stats.Inc(CreateFail)
  135. }
  136. return e, err
  137. }
  138. // Set function creates or replace the node at nodePath.
  139. func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
  140. s.worldLock.Lock()
  141. defer s.worldLock.Unlock()
  142. e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
  143. if err == nil {
  144. s.Stats.Inc(SetSuccess)
  145. } else {
  146. s.Stats.Inc(SetFail)
  147. }
  148. return e, err
  149. }
  150. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  151. value string, expireTime time.Time) (*Event, error) {
  152. nodePath = path.Clean(path.Join("/", nodePath))
  153. // we do not allow the user to change "/"
  154. if nodePath == "/" {
  155. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  156. }
  157. s.worldLock.Lock()
  158. defer s.worldLock.Unlock()
  159. n, err := s.internalGet(nodePath)
  160. if err != nil {
  161. s.Stats.Inc(CompareAndSwapFail)
  162. return nil, err
  163. }
  164. if n.IsDir() { // can only compare and swap file
  165. s.Stats.Inc(CompareAndSwapFail)
  166. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  167. }
  168. // If both of the prevValue and prevIndex are given, we will test both of them.
  169. // Command will be executed, only if both of the tests are successful.
  170. if !n.Compare(prevValue, prevIndex) {
  171. cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  172. s.Stats.Inc(CompareAndSwapFail)
  173. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  174. }
  175. // update etcd index
  176. s.CurrentIndex++
  177. e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
  178. e.PrevNode = n.Repr(false, false)
  179. eNode := e.Node
  180. // if test succeed, write the value
  181. n.Write(value, s.CurrentIndex)
  182. n.UpdateTTL(expireTime)
  183. eNode.Value = value
  184. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  185. s.WatcherHub.notify(e)
  186. s.Stats.Inc(CompareAndSwapSuccess)
  187. return e, nil
  188. }
  189. // Delete function deletes the node at the given path.
  190. // If the node is a directory, recursive must be true to delete it.
  191. func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
  192. nodePath = path.Clean(path.Join("/", nodePath))
  193. // we do not allow the user to change "/"
  194. if nodePath == "/" {
  195. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  196. }
  197. s.worldLock.Lock()
  198. defer s.worldLock.Unlock()
  199. // recursive implies dir
  200. if recursive == true {
  201. dir = true
  202. }
  203. n, err := s.internalGet(nodePath)
  204. if err != nil { // if the node does not exist, return error
  205. s.Stats.Inc(DeleteFail)
  206. return nil, err
  207. }
  208. nextIndex := s.CurrentIndex + 1
  209. e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
  210. e.PrevNode = n.Repr(false, false)
  211. eNode := e.Node
  212. if n.IsDir() {
  213. eNode.Dir = true
  214. }
  215. callback := func(path string) { // notify function
  216. // notify the watchers with deleted set true
  217. s.WatcherHub.notifyWatchers(e, path, true)
  218. }
  219. err = n.Remove(dir, recursive, callback)
  220. if err != nil {
  221. s.Stats.Inc(DeleteFail)
  222. return nil, err
  223. }
  224. // update etcd index
  225. s.CurrentIndex++
  226. s.WatcherHub.notify(e)
  227. s.Stats.Inc(DeleteSuccess)
  228. return e, nil
  229. }
  230. func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
  231. nodePath = path.Clean(path.Join("/", nodePath))
  232. s.worldLock.Lock()
  233. defer s.worldLock.Unlock()
  234. n, err := s.internalGet(nodePath)
  235. if err != nil { // if the node does not exist, return error
  236. s.Stats.Inc(CompareAndDeleteFail)
  237. return nil, err
  238. }
  239. if n.IsDir() { // can only compare and delete file
  240. s.Stats.Inc(CompareAndSwapFail)
  241. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  242. }
  243. // If both of the prevValue and prevIndex are given, we will test both of them.
  244. // Command will be executed, only if both of the tests are successful.
  245. if !n.Compare(prevValue, prevIndex) {
  246. cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  247. s.Stats.Inc(CompareAndDeleteFail)
  248. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  249. }
  250. // update etcd index
  251. s.CurrentIndex++
  252. e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
  253. e.PrevNode = n.Repr(false, false)
  254. callback := func(path string) { // notify function
  255. // notify the watchers with deleted set true
  256. s.WatcherHub.notifyWatchers(e, path, true)
  257. }
  258. // delete a key-value pair, no error should happen
  259. n.Remove(false, false, callback)
  260. s.WatcherHub.notify(e)
  261. s.Stats.Inc(CompareAndDeleteSuccess)
  262. return e, nil
  263. }
  264. func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) {
  265. key = path.Clean(path.Join("/", key))
  266. nextIndex := s.CurrentIndex + 1
  267. s.worldLock.RLock()
  268. defer s.worldLock.RUnlock()
  269. var w *Watcher
  270. var err *etcdErr.Error
  271. if sinceIndex == 0 {
  272. w, err = s.WatcherHub.watch(key, recursive, stream, nextIndex)
  273. } else {
  274. w, err = s.WatcherHub.watch(key, recursive, stream, sinceIndex)
  275. }
  276. if err != nil {
  277. // watchhub do not know the current Index
  278. // we need to attach the currentIndex here
  279. err.Index = s.CurrentIndex
  280. return nil, err
  281. }
  282. return w, nil
  283. }
  284. // walk function walks all the nodePath and apply the walkFunc on each directory
  285. func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
  286. components := strings.Split(nodePath, "/")
  287. curr := s.Root
  288. var err *etcdErr.Error
  289. for i := 1; i < len(components); i++ {
  290. if len(components[i]) == 0 { // ignore empty string
  291. return curr, nil
  292. }
  293. curr, err = walkFunc(curr, components[i])
  294. if err != nil {
  295. return nil, err
  296. }
  297. }
  298. return curr, nil
  299. }
  300. // Update function updates the value/ttl of the node.
  301. // If the node is a file, the value and the ttl can be updated.
  302. // If the node is a directory, only the ttl can be updated.
  303. func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
  304. nodePath = path.Clean(path.Join("/", nodePath))
  305. // we do not allow the user to change "/"
  306. if nodePath == "/" {
  307. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  308. }
  309. s.worldLock.Lock()
  310. defer s.worldLock.Unlock()
  311. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  312. n, err := s.internalGet(nodePath)
  313. if err != nil { // if the node does not exist, return error
  314. s.Stats.Inc(UpdateFail)
  315. return nil, err
  316. }
  317. e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
  318. e.PrevNode = n.Repr(false, false)
  319. eNode := e.Node
  320. if n.IsDir() && len(newValue) != 0 {
  321. // if the node is a directory, we cannot update value to non-empty
  322. s.Stats.Inc(UpdateFail)
  323. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  324. }
  325. n.Write(newValue, nextIndex)
  326. eNode.Value = newValue
  327. // update ttl
  328. n.UpdateTTL(expireTime)
  329. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  330. s.WatcherHub.notify(e)
  331. s.Stats.Inc(UpdateSuccess)
  332. s.CurrentIndex = nextIndex
  333. return e, nil
  334. }
  335. func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
  336. expireTime time.Time, action string) (*Event, error) {
  337. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  338. if unique { // append unique item under the node path
  339. nodePath += "/" + strconv.FormatUint(nextIndex, 10)
  340. }
  341. nodePath = path.Clean(path.Join("/", nodePath))
  342. // we do not allow the user to change "/"
  343. if nodePath == "/" {
  344. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
  345. }
  346. // Assume expire times that are way in the past are not valid.
  347. // This can occur when the time is serialized to JSON and read back in.
  348. if expireTime.Before(minExpireTime) {
  349. expireTime = Permanent
  350. }
  351. dirName, nodeName := path.Split(nodePath)
  352. // walk through the nodePath, create dirs and get the last directory node
  353. d, err := s.walk(dirName, s.checkDir)
  354. if err != nil {
  355. s.Stats.Inc(SetFail)
  356. err.Index = currIndex
  357. return nil, err
  358. }
  359. e := newEvent(action, nodePath, nextIndex, nextIndex)
  360. eNode := e.Node
  361. n, _ := d.GetChild(nodeName)
  362. // force will try to replace a existing file
  363. if n != nil {
  364. if replace {
  365. if n.IsDir() {
  366. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  367. }
  368. e.PrevNode = n.Repr(false, false)
  369. n.Remove(false, false, nil)
  370. } else {
  371. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
  372. }
  373. }
  374. if !dir { // create file
  375. eNode.Value = value
  376. n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
  377. } else { // create directory
  378. eNode.Dir = true
  379. n = newDir(s, nodePath, nextIndex, d, "", expireTime)
  380. }
  381. // we are sure d is a directory and does not have the children with name n.Name
  382. d.Add(n)
  383. // node with TTL
  384. if !n.IsPermanent() {
  385. s.ttlKeyHeap.push(n)
  386. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  387. }
  388. s.CurrentIndex = nextIndex
  389. s.WatcherHub.notify(e)
  390. return e, nil
  391. }
  392. // InternalGet function get the node of the given nodePath.
  393. func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
  394. nodePath = path.Clean(path.Join("/", nodePath))
  395. walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
  396. if !parent.IsDir() {
  397. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  398. return nil, err
  399. }
  400. child, ok := parent.Children[name]
  401. if ok {
  402. return child, nil
  403. }
  404. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
  405. }
  406. f, err := s.walk(nodePath, walkFunc)
  407. if err != nil {
  408. return nil, err
  409. }
  410. return f, nil
  411. }
  412. // deleteExpiredKyes will delete all
  413. func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  414. s.worldLock.Lock()
  415. defer s.worldLock.Unlock()
  416. for {
  417. node := s.ttlKeyHeap.top()
  418. if node == nil || node.ExpireTime.After(cutoff) {
  419. break
  420. }
  421. s.CurrentIndex++
  422. e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
  423. e.PrevNode = node.Repr(false, false)
  424. callback := func(path string) { // notify function
  425. // notify the watchers with deleted set true
  426. s.WatcherHub.notifyWatchers(e, path, true)
  427. }
  428. s.ttlKeyHeap.pop()
  429. node.Remove(true, true, callback)
  430. s.Stats.Inc(ExpireCount)
  431. s.WatcherHub.notify(e)
  432. }
  433. }
  434. // checkDir function will check whether the component is a directory under parent node.
  435. // If it is a directory, this function will return the pointer to that node.
  436. // If it does not exist, this function will create a new directory and return the pointer to that node.
  437. // If it is a file, this function will return error.
  438. func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
  439. node, ok := parent.Children[dirName]
  440. if ok {
  441. if node.IsDir() {
  442. return node, nil
  443. }
  444. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
  445. }
  446. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
  447. parent.Children[dirName] = n
  448. return n, nil
  449. }
  450. // Save function saves the static state of the store system.
  451. // Save function will not be able to save the state of watchers.
  452. // Save function will not save the parent field of the node. Or there will
  453. // be cyclic dependencies issue for the json package.
  454. func (s *store) Save() ([]byte, error) {
  455. s.worldLock.Lock()
  456. clonedStore := newStore()
  457. clonedStore.CurrentIndex = s.CurrentIndex
  458. clonedStore.Root = s.Root.Clone()
  459. clonedStore.WatcherHub = s.WatcherHub.clone()
  460. clonedStore.Stats = s.Stats.clone()
  461. clonedStore.CurrentVersion = s.CurrentVersion
  462. s.worldLock.Unlock()
  463. b, err := json.Marshal(clonedStore)
  464. if err != nil {
  465. return nil, err
  466. }
  467. return b, nil
  468. }
  469. // recovery function recovery the store system from a static state.
  470. // It needs to recovery the parent field of the nodes.
  471. // It needs to delete the expired nodes since the saved time and also
  472. // need to create monitor go routines.
  473. func (s *store) Recovery(state []byte) error {
  474. s.worldLock.Lock()
  475. defer s.worldLock.Unlock()
  476. err := json.Unmarshal(state, s)
  477. if err != nil {
  478. return err
  479. }
  480. s.ttlKeyHeap = newTtlKeyHeap()
  481. s.Root.recoverAndclean()
  482. return nil
  483. }
  484. func (s *store) JsonStats() []byte {
  485. s.Stats.Watchers = uint64(s.WatcherHub.count)
  486. return s.Stats.toJson()
  487. }
  488. func (s *store) TotalTransactions() uint64 {
  489. return s.Stats.TotalTranscations()
  490. }