store.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "path"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "time"
  22. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  23. etcdErr "github.com/coreos/etcd/error"
  24. )
  25. // The default version to set when the store is first initialized.
  26. const defaultVersion = 2
  27. var minExpireTime time.Time
  28. func init() {
  29. minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
  30. }
  31. type Store interface {
  32. Version() int
  33. Index() uint64
  34. Get(nodePath string, recursive, sorted bool) (*Event, error)
  35. Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
  36. Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
  37. Create(nodePath string, dir bool, value string, unique bool,
  38. expireTime time.Time) (*Event, error)
  39. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  40. value string, expireTime time.Time) (*Event, error)
  41. Delete(nodePath string, dir, recursive bool) (*Event, error)
  42. CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  43. Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
  44. Save() ([]byte, error)
  45. Recovery(state []byte) error
  46. JsonStats() []byte
  47. DeleteExpiredKeys(cutoff time.Time)
  48. }
  49. type store struct {
  50. Root *node
  51. WatcherHub *watcherHub
  52. CurrentIndex uint64
  53. Stats *Stats
  54. CurrentVersion int
  55. ttlKeyHeap *ttlKeyHeap // need to recovery manually
  56. worldLock sync.RWMutex // stop the world lock
  57. clock clockwork.Clock
  58. }
  59. func New() Store {
  60. s := newStore()
  61. s.clock = clockwork.NewRealClock()
  62. return s
  63. }
  64. func newStore() *store {
  65. s := new(store)
  66. s.CurrentVersion = defaultVersion
  67. s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
  68. s.Stats = newStats()
  69. s.WatcherHub = newWatchHub(1000)
  70. s.ttlKeyHeap = newTtlKeyHeap()
  71. return s
  72. }
  73. // Version retrieves current version of the store.
  74. func (s *store) Version() int {
  75. return s.CurrentVersion
  76. }
  77. // Retrieves current of the store
  78. func (s *store) Index() uint64 {
  79. s.worldLock.RLock()
  80. defer s.worldLock.RUnlock()
  81. return s.CurrentIndex
  82. }
  83. // Get returns a get event.
  84. // If recursive is true, it will return all the content under the node path.
  85. // If sorted is true, it will sort the content by keys.
  86. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
  87. s.worldLock.RLock()
  88. defer s.worldLock.RUnlock()
  89. nodePath = path.Clean(path.Join("/", nodePath))
  90. n, err := s.internalGet(nodePath)
  91. if err != nil {
  92. s.Stats.Inc(GetFail)
  93. return nil, err
  94. }
  95. e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  96. e.EtcdIndex = s.CurrentIndex
  97. e.Node.loadInternalNode(n, recursive, sorted, s.clock)
  98. s.Stats.Inc(GetSuccess)
  99. return e, nil
  100. }
  101. // Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
  102. // If the node has already existed, create will fail.
  103. // If any node on the path is a file, create will fail.
  104. func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
  105. s.worldLock.Lock()
  106. defer s.worldLock.Unlock()
  107. e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
  108. if err == nil {
  109. e.EtcdIndex = s.CurrentIndex
  110. s.WatcherHub.notify(e)
  111. s.Stats.Inc(CreateSuccess)
  112. } else {
  113. s.Stats.Inc(CreateFail)
  114. }
  115. return e, err
  116. }
  117. // Set creates or replace the node at nodePath.
  118. func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
  119. var err error
  120. s.worldLock.Lock()
  121. defer s.worldLock.Unlock()
  122. defer func() {
  123. if err == nil {
  124. s.Stats.Inc(SetSuccess)
  125. } else {
  126. s.Stats.Inc(SetFail)
  127. }
  128. }()
  129. // Get prevNode value
  130. n, getErr := s.internalGet(nodePath)
  131. if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
  132. err = getErr
  133. return nil, err
  134. }
  135. // Set new value
  136. e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
  137. if err != nil {
  138. return nil, err
  139. }
  140. e.EtcdIndex = s.CurrentIndex
  141. // Put prevNode into event
  142. if getErr == nil {
  143. prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  144. prev.Node.loadInternalNode(n, false, false, s.clock)
  145. e.PrevNode = prev.Node
  146. }
  147. s.WatcherHub.notify(e)
  148. return e, nil
  149. }
  150. // returns user-readable cause of failed comparison
  151. func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
  152. switch which {
  153. case CompareIndexNotMatch:
  154. return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
  155. case CompareValueNotMatch:
  156. return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
  157. default:
  158. return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  159. }
  160. }
  161. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  162. value string, expireTime time.Time) (*Event, error) {
  163. s.worldLock.Lock()
  164. defer s.worldLock.Unlock()
  165. nodePath = path.Clean(path.Join("/", nodePath))
  166. // we do not allow the user to change "/"
  167. if nodePath == "/" {
  168. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  169. }
  170. n, err := s.internalGet(nodePath)
  171. if err != nil {
  172. s.Stats.Inc(CompareAndSwapFail)
  173. return nil, err
  174. }
  175. if n.IsDir() { // can only compare and swap file
  176. s.Stats.Inc(CompareAndSwapFail)
  177. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  178. }
  179. // If both of the prevValue and prevIndex are given, we will test both of them.
  180. // Command will be executed, only if both of the tests are successful.
  181. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  182. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  183. s.Stats.Inc(CompareAndSwapFail)
  184. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  185. }
  186. // update etcd index
  187. s.CurrentIndex++
  188. e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
  189. e.EtcdIndex = s.CurrentIndex
  190. e.PrevNode = n.Repr(false, false, s.clock)
  191. eNode := e.Node
  192. // if test succeed, write the value
  193. n.Write(value, s.CurrentIndex)
  194. n.UpdateTTL(expireTime)
  195. // copy the value for safety
  196. valueCopy := value
  197. eNode.Value = &valueCopy
  198. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  199. s.WatcherHub.notify(e)
  200. s.Stats.Inc(CompareAndSwapSuccess)
  201. return e, nil
  202. }
  203. // Delete deletes the node at the given path.
  204. // If the node is a directory, recursive must be true to delete it.
  205. func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
  206. s.worldLock.Lock()
  207. defer s.worldLock.Unlock()
  208. nodePath = path.Clean(path.Join("/", nodePath))
  209. // we do not allow the user to change "/"
  210. if nodePath == "/" {
  211. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  212. }
  213. // recursive implies dir
  214. if recursive == true {
  215. dir = true
  216. }
  217. n, err := s.internalGet(nodePath)
  218. if err != nil { // if the node does not exist, return error
  219. s.Stats.Inc(DeleteFail)
  220. return nil, err
  221. }
  222. nextIndex := s.CurrentIndex + 1
  223. e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
  224. e.EtcdIndex = nextIndex
  225. e.PrevNode = n.Repr(false, false, s.clock)
  226. eNode := e.Node
  227. if n.IsDir() {
  228. eNode.Dir = true
  229. }
  230. callback := func(path string) { // notify function
  231. // notify the watchers with deleted set true
  232. s.WatcherHub.notifyWatchers(e, path, true)
  233. }
  234. err = n.Remove(dir, recursive, callback)
  235. if err != nil {
  236. s.Stats.Inc(DeleteFail)
  237. return nil, err
  238. }
  239. // update etcd index
  240. s.CurrentIndex++
  241. s.WatcherHub.notify(e)
  242. s.Stats.Inc(DeleteSuccess)
  243. return e, nil
  244. }
  245. func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
  246. nodePath = path.Clean(path.Join("/", nodePath))
  247. s.worldLock.Lock()
  248. defer s.worldLock.Unlock()
  249. n, err := s.internalGet(nodePath)
  250. if err != nil { // if the node does not exist, return error
  251. s.Stats.Inc(CompareAndDeleteFail)
  252. return nil, err
  253. }
  254. if n.IsDir() { // can only compare and delete file
  255. s.Stats.Inc(CompareAndSwapFail)
  256. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  257. }
  258. // If both of the prevValue and prevIndex are given, we will test both of them.
  259. // Command will be executed, only if both of the tests are successful.
  260. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  261. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  262. s.Stats.Inc(CompareAndDeleteFail)
  263. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  264. }
  265. // update etcd index
  266. s.CurrentIndex++
  267. e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
  268. e.EtcdIndex = s.CurrentIndex
  269. e.PrevNode = n.Repr(false, false, s.clock)
  270. callback := func(path string) { // notify function
  271. // notify the watchers with deleted set true
  272. s.WatcherHub.notifyWatchers(e, path, true)
  273. }
  274. // delete a key-value pair, no error should happen
  275. n.Remove(false, false, callback)
  276. s.WatcherHub.notify(e)
  277. s.Stats.Inc(CompareAndDeleteSuccess)
  278. return e, nil
  279. }
  280. func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
  281. s.worldLock.RLock()
  282. defer s.worldLock.RUnlock()
  283. key = path.Clean(path.Join("/", key))
  284. if sinceIndex == 0 {
  285. sinceIndex = s.CurrentIndex + 1
  286. }
  287. // WatchHub does not know about the current index, so we need to pass it in
  288. w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
  289. if err != nil {
  290. return nil, err
  291. }
  292. return w, nil
  293. }
  294. // walk walks all the nodePath and apply the walkFunc on each directory
  295. func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
  296. components := strings.Split(nodePath, "/")
  297. curr := s.Root
  298. var err *etcdErr.Error
  299. for i := 1; i < len(components); i++ {
  300. if len(components[i]) == 0 { // ignore empty string
  301. return curr, nil
  302. }
  303. curr, err = walkFunc(curr, components[i])
  304. if err != nil {
  305. return nil, err
  306. }
  307. }
  308. return curr, nil
  309. }
  310. // Update updates the value/ttl of the node.
  311. // If the node is a file, the value and the ttl can be updated.
  312. // If the node is a directory, only the ttl can be updated.
  313. func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
  314. s.worldLock.Lock()
  315. defer s.worldLock.Unlock()
  316. nodePath = path.Clean(path.Join("/", nodePath))
  317. // we do not allow the user to change "/"
  318. if nodePath == "/" {
  319. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  320. }
  321. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  322. n, err := s.internalGet(nodePath)
  323. if err != nil { // if the node does not exist, return error
  324. s.Stats.Inc(UpdateFail)
  325. return nil, err
  326. }
  327. e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
  328. e.EtcdIndex = nextIndex
  329. e.PrevNode = n.Repr(false, false, s.clock)
  330. eNode := e.Node
  331. if n.IsDir() && len(newValue) != 0 {
  332. // if the node is a directory, we cannot update value to non-empty
  333. s.Stats.Inc(UpdateFail)
  334. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  335. }
  336. n.Write(newValue, nextIndex)
  337. if n.IsDir() {
  338. eNode.Dir = true
  339. } else {
  340. // copy the value for safety
  341. newValueCopy := newValue
  342. eNode.Value = &newValueCopy
  343. }
  344. // update ttl
  345. n.UpdateTTL(expireTime)
  346. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  347. s.WatcherHub.notify(e)
  348. s.Stats.Inc(UpdateSuccess)
  349. s.CurrentIndex = nextIndex
  350. return e, nil
  351. }
  352. func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
  353. expireTime time.Time, action string) (*Event, error) {
  354. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  355. if unique { // append unique item under the node path
  356. nodePath += "/" + strconv.FormatUint(nextIndex, 10)
  357. }
  358. nodePath = path.Clean(path.Join("/", nodePath))
  359. // we do not allow the user to change "/"
  360. if nodePath == "/" {
  361. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
  362. }
  363. // Assume expire times that are way in the past are
  364. // This can occur when the time is serialized to JS
  365. if expireTime.Before(minExpireTime) {
  366. expireTime = Permanent
  367. }
  368. dirName, nodeName := path.Split(nodePath)
  369. // walk through the nodePath, create dirs and get the last directory node
  370. d, err := s.walk(dirName, s.checkDir)
  371. if err != nil {
  372. s.Stats.Inc(SetFail)
  373. err.Index = currIndex
  374. return nil, err
  375. }
  376. e := newEvent(action, nodePath, nextIndex, nextIndex)
  377. eNode := e.Node
  378. n, _ := d.GetChild(nodeName)
  379. // force will try to replace a existing file
  380. if n != nil {
  381. if replace {
  382. if n.IsDir() {
  383. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  384. }
  385. e.PrevNode = n.Repr(false, false, s.clock)
  386. n.Remove(false, false, nil)
  387. } else {
  388. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
  389. }
  390. }
  391. if !dir { // create file
  392. // copy the value for safety
  393. valueCopy := value
  394. eNode.Value = &valueCopy
  395. n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
  396. } else { // create directory
  397. eNode.Dir = true
  398. n = newDir(s, nodePath, nextIndex, d, "", expireTime)
  399. }
  400. // we are sure d is a directory and does not have the children with name n.Name
  401. d.Add(n)
  402. // node with TTL
  403. if !n.IsPermanent() {
  404. s.ttlKeyHeap.push(n)
  405. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  406. }
  407. s.CurrentIndex = nextIndex
  408. return e, nil
  409. }
  410. // InternalGet gets the node of the given nodePath.
  411. func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
  412. nodePath = path.Clean(path.Join("/", nodePath))
  413. walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
  414. if !parent.IsDir() {
  415. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  416. return nil, err
  417. }
  418. child, ok := parent.Children[name]
  419. if ok {
  420. return child, nil
  421. }
  422. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
  423. }
  424. f, err := s.walk(nodePath, walkFunc)
  425. if err != nil {
  426. return nil, err
  427. }
  428. return f, nil
  429. }
  430. // deleteExpiredKyes will delete all
  431. func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  432. s.worldLock.Lock()
  433. defer s.worldLock.Unlock()
  434. for {
  435. node := s.ttlKeyHeap.top()
  436. if node == nil || node.ExpireTime.After(cutoff) {
  437. break
  438. }
  439. s.CurrentIndex++
  440. e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
  441. e.EtcdIndex = s.CurrentIndex
  442. e.PrevNode = node.Repr(false, false, s.clock)
  443. callback := func(path string) { // notify function
  444. // notify the watchers with deleted set true
  445. s.WatcherHub.notifyWatchers(e, path, true)
  446. }
  447. s.ttlKeyHeap.pop()
  448. node.Remove(true, true, callback)
  449. s.Stats.Inc(ExpireCount)
  450. s.WatcherHub.notify(e)
  451. }
  452. }
  453. // checkDir will check whether the component is a directory under parent node.
  454. // If it is a directory, this function will return the pointer to that node.
  455. // If it does not exist, this function will create a new directory and return the pointer to that node.
  456. // If it is a file, this function will return error.
  457. func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
  458. node, ok := parent.Children[dirName]
  459. if ok {
  460. if node.IsDir() {
  461. return node, nil
  462. }
  463. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
  464. }
  465. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
  466. parent.Children[dirName] = n
  467. return n, nil
  468. }
  469. // Save saves the static state of the store system.
  470. // It will not be able to save the state of watchers.
  471. // It will not save the parent field of the node. Or there will
  472. // be cyclic dependencies issue for the json package.
  473. func (s *store) Save() ([]byte, error) {
  474. s.worldLock.Lock()
  475. clonedStore := newStore()
  476. clonedStore.CurrentIndex = s.CurrentIndex
  477. clonedStore.Root = s.Root.Clone()
  478. clonedStore.WatcherHub = s.WatcherHub.clone()
  479. clonedStore.Stats = s.Stats.clone()
  480. clonedStore.CurrentVersion = s.CurrentVersion
  481. s.worldLock.Unlock()
  482. b, err := json.Marshal(clonedStore)
  483. if err != nil {
  484. return nil, err
  485. }
  486. return b, nil
  487. }
  488. // Recovery recovers the store system from a static state
  489. // It needs to recover the parent field of the nodes.
  490. // It needs to delete the expired nodes since the saved time and also
  491. // needs to create monitoring go routines.
  492. func (s *store) Recovery(state []byte) error {
  493. s.worldLock.Lock()
  494. defer s.worldLock.Unlock()
  495. err := json.Unmarshal(state, s)
  496. if err != nil {
  497. return err
  498. }
  499. s.ttlKeyHeap = newTtlKeyHeap()
  500. s.Root.recoverAndclean()
  501. return nil
  502. }
  503. func (s *store) JsonStats() []byte {
  504. s.Stats.Watchers = uint64(s.WatcherHub.count)
  505. return s.Stats.toJson()
  506. }