store.go 18 KB


  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package store
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "path"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  24. etcdErr "github.com/coreos/etcd/error"
  25. "github.com/coreos/etcd/pkg/types"
  26. )
  27. // The default version to set when the store is first initialized.
  28. const defaultVersion = 2
  29. var minExpireTime time.Time
  30. func init() {
  31. minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
  32. }
  33. type Store interface {
  34. Version() int
  35. Index() uint64
  36. Get(nodePath string, recursive, sorted bool) (*Event, error)
  37. Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
  38. Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
  39. Create(nodePath string, dir bool, value string, unique bool,
  40. expireTime time.Time) (*Event, error)
  41. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  42. value string, expireTime time.Time) (*Event, error)
  43. Delete(nodePath string, dir, recursive bool) (*Event, error)
  44. CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  45. Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
  46. Save() ([]byte, error)
  47. Recovery(state []byte) error
  48. Clone() Store
  49. SaveNoCopy() ([]byte, error)
  50. JsonStats() []byte
  51. DeleteExpiredKeys(cutoff time.Time)
  52. }
  53. type store struct {
  54. Root *node
  55. WatcherHub *watcherHub
  56. CurrentIndex uint64
  57. Stats *Stats
  58. CurrentVersion int
  59. ttlKeyHeap *ttlKeyHeap // need to recovery manually
  60. worldLock sync.RWMutex // stop the world lock
  61. clock clockwork.Clock
  62. readonlySet types.Set
  63. }
  64. // The given namespaces will be created as initial directories in the returned store.
  65. func New(namespaces ...string) Store {
  66. s := newStore(namespaces...)
  67. s.clock = clockwork.NewRealClock()
  68. return s
  69. }
  70. func newStore(namespaces ...string) *store {
  71. s := new(store)
  72. s.CurrentVersion = defaultVersion
  73. s.Root = newDir(s, "/", s.CurrentIndex, nil, Permanent)
  74. for _, namespace := range namespaces {
  75. s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, Permanent))
  76. }
  77. s.Stats = newStats()
  78. s.WatcherHub = newWatchHub(1000)
  79. s.ttlKeyHeap = newTtlKeyHeap()
  80. s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...)
  81. return s
  82. }
  83. // Version retrieves current version of the store.
  84. func (s *store) Version() int {
  85. return s.CurrentVersion
  86. }
  87. // Retrieves current of the store
  88. func (s *store) Index() uint64 {
  89. s.worldLock.RLock()
  90. defer s.worldLock.RUnlock()
  91. return s.CurrentIndex
  92. }
  93. // Get returns a get event.
  94. // If recursive is true, it will return all the content under the node path.
  95. // If sorted is true, it will sort the content by keys.
  96. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
  97. var err *etcdErr.Error
  98. s.worldLock.Lock()
  99. defer s.worldLock.Unlock()
  100. defer func() {
  101. if err == nil {
  102. s.Stats.Inc(GetSuccess)
  103. if recursive {
  104. reportReadSuccess(GetRecursive)
  105. } else {
  106. reportReadSuccess(Get)
  107. }
  108. return
  109. }
  110. s.Stats.Inc(GetFail)
  111. if recursive {
  112. reportReadFailure(GetRecursive)
  113. } else {
  114. reportReadFailure(Get)
  115. }
  116. }()
  117. nodePath = path.Clean(path.Join("/", nodePath))
  118. n, err := s.internalGet(nodePath)
  119. if err != nil {
  120. return nil, err
  121. }
  122. e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  123. e.EtcdIndex = s.CurrentIndex
  124. e.Node.loadInternalNode(n, recursive, sorted, s.clock)
  125. return e, nil
  126. }
  127. // Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
  128. // If the node has already existed, create will fail.
  129. // If any node on the path is a file, create will fail.
  130. func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
  131. var err *etcdErr.Error
  132. s.worldLock.Lock()
  133. defer s.worldLock.Unlock()
  134. defer func() {
  135. if err == nil {
  136. s.Stats.Inc(CreateSuccess)
  137. reportWriteSuccess(Create)
  138. return
  139. }
  140. s.Stats.Inc(CreateFail)
  141. reportWriteFailure(Create)
  142. }()
  143. e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
  144. if err != nil {
  145. return nil, err
  146. }
  147. e.EtcdIndex = s.CurrentIndex
  148. s.WatcherHub.notify(e)
  149. return e, nil
  150. }
  151. // Set creates or replace the node at nodePath.
  152. func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
  153. var err *etcdErr.Error
  154. s.worldLock.Lock()
  155. defer s.worldLock.Unlock()
  156. defer func() {
  157. if err == nil {
  158. s.Stats.Inc(SetSuccess)
  159. reportWriteSuccess(Set)
  160. return
  161. }
  162. s.Stats.Inc(SetFail)
  163. reportWriteFailure(Set)
  164. }()
  165. // Get prevNode value
  166. n, getErr := s.internalGet(nodePath)
  167. if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
  168. err = getErr
  169. return nil, err
  170. }
  171. // Set new value
  172. e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
  173. if err != nil {
  174. return nil, err
  175. }
  176. e.EtcdIndex = s.CurrentIndex
  177. // Put prevNode into event
  178. if getErr == nil {
  179. prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  180. prev.Node.loadInternalNode(n, false, false, s.clock)
  181. e.PrevNode = prev.Node
  182. }
  183. s.WatcherHub.notify(e)
  184. return e, nil
  185. }
  186. // returns user-readable cause of failed comparison
  187. func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
  188. switch which {
  189. case CompareIndexNotMatch:
  190. return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
  191. case CompareValueNotMatch:
  192. return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
  193. default:
  194. return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  195. }
  196. }
  197. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  198. value string, expireTime time.Time) (*Event, error) {
  199. var err *etcdErr.Error
  200. s.worldLock.Lock()
  201. defer s.worldLock.Unlock()
  202. defer func() {
  203. if err == nil {
  204. s.Stats.Inc(CompareAndSwapSuccess)
  205. reportWriteSuccess(CompareAndSwap)
  206. return
  207. }
  208. s.Stats.Inc(CompareAndSwapFail)
  209. reportWriteFailure(CompareAndSwap)
  210. }()
  211. nodePath = path.Clean(path.Join("/", nodePath))
  212. // we do not allow the user to change "/"
  213. if s.readonlySet.Contains(nodePath) {
  214. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  215. }
  216. n, err := s.internalGet(nodePath)
  217. if err != nil {
  218. return nil, err
  219. }
  220. if n.IsDir() { // can only compare and swap file
  221. err = etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  222. return nil, err
  223. }
  224. // If both of the prevValue and prevIndex are given, we will test both of them.
  225. // Command will be executed, only if both of the tests are successful.
  226. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  227. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  228. err = etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  229. return nil, err
  230. }
  231. // update etcd index
  232. s.CurrentIndex++
  233. e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
  234. e.EtcdIndex = s.CurrentIndex
  235. e.PrevNode = n.Repr(false, false, s.clock)
  236. eNode := e.Node
  237. // if test succeed, write the value
  238. n.Write(value, s.CurrentIndex)
  239. n.UpdateTTL(expireTime)
  240. // copy the value for safety
  241. valueCopy := value
  242. eNode.Value = &valueCopy
  243. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  244. s.WatcherHub.notify(e)
  245. return e, nil
  246. }
  247. // Delete deletes the node at the given path.
  248. // If the node is a directory, recursive must be true to delete it.
  249. func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
  250. var err *etcdErr.Error
  251. s.worldLock.Lock()
  252. defer s.worldLock.Unlock()
  253. defer func() {
  254. if err == nil {
  255. s.Stats.Inc(DeleteSuccess)
  256. reportWriteSuccess(Delete)
  257. return
  258. }
  259. s.Stats.Inc(DeleteFail)
  260. reportWriteFailure(Delete)
  261. }()
  262. nodePath = path.Clean(path.Join("/", nodePath))
  263. // we do not allow the user to change "/"
  264. if s.readonlySet.Contains(nodePath) {
  265. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  266. }
  267. // recursive implies dir
  268. if recursive == true {
  269. dir = true
  270. }
  271. n, err := s.internalGet(nodePath)
  272. if err != nil { // if the node does not exist, return error
  273. return nil, err
  274. }
  275. nextIndex := s.CurrentIndex + 1
  276. e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
  277. e.EtcdIndex = nextIndex
  278. e.PrevNode = n.Repr(false, false, s.clock)
  279. eNode := e.Node
  280. if n.IsDir() {
  281. eNode.Dir = true
  282. }
  283. callback := func(path string) { // notify function
  284. // notify the watchers with deleted set true
  285. s.WatcherHub.notifyWatchers(e, path, true)
  286. }
  287. err = n.Remove(dir, recursive, callback)
  288. if err != nil {
  289. return nil, err
  290. }
  291. // update etcd index
  292. s.CurrentIndex++
  293. s.WatcherHub.notify(e)
  294. return e, nil
  295. }
  296. func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
  297. var err *etcdErr.Error
  298. s.worldLock.Lock()
  299. defer s.worldLock.Unlock()
  300. defer func() {
  301. if err == nil {
  302. s.Stats.Inc(CompareAndDeleteSuccess)
  303. reportWriteSuccess(CompareAndDelete)
  304. return
  305. }
  306. s.Stats.Inc(CompareAndDeleteFail)
  307. reportWriteFailure(CompareAndDelete)
  308. }()
  309. nodePath = path.Clean(path.Join("/", nodePath))
  310. n, err := s.internalGet(nodePath)
  311. if err != nil { // if the node does not exist, return error
  312. return nil, err
  313. }
  314. if n.IsDir() { // can only compare and delete file
  315. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  316. }
  317. // If both of the prevValue and prevIndex are given, we will test both of them.
  318. // Command will be executed, only if both of the tests are successful.
  319. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  320. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  321. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  322. }
  323. // update etcd index
  324. s.CurrentIndex++
  325. e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
  326. e.EtcdIndex = s.CurrentIndex
  327. e.PrevNode = n.Repr(false, false, s.clock)
  328. callback := func(path string) { // notify function
  329. // notify the watchers with deleted set true
  330. s.WatcherHub.notifyWatchers(e, path, true)
  331. }
  332. err = n.Remove(false, false, callback)
  333. if err != nil {
  334. return nil, err
  335. }
  336. s.WatcherHub.notify(e)
  337. return e, nil
  338. }
  339. func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
  340. s.worldLock.RLock()
  341. defer s.worldLock.RUnlock()
  342. key = path.Clean(path.Join("/", key))
  343. if sinceIndex == 0 {
  344. sinceIndex = s.CurrentIndex + 1
  345. }
  346. // WatchHub does not know about the current index, so we need to pass it in
  347. w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
  348. if err != nil {
  349. return nil, err
  350. }
  351. return w, nil
  352. }
  353. // walk walks all the nodePath and apply the walkFunc on each directory
  354. func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
  355. components := strings.Split(nodePath, "/")
  356. curr := s.Root
  357. var err *etcdErr.Error
  358. for i := 1; i < len(components); i++ {
  359. if len(components[i]) == 0 { // ignore empty string
  360. return curr, nil
  361. }
  362. curr, err = walkFunc(curr, components[i])
  363. if err != nil {
  364. return nil, err
  365. }
  366. }
  367. return curr, nil
  368. }
  369. // Update updates the value/ttl of the node.
  370. // If the node is a file, the value and the ttl can be updated.
  371. // If the node is a directory, only the ttl can be updated.
  372. func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
  373. var err *etcdErr.Error
  374. s.worldLock.Lock()
  375. defer s.worldLock.Unlock()
  376. defer func() {
  377. if err == nil {
  378. s.Stats.Inc(UpdateSuccess)
  379. reportWriteSuccess(Update)
  380. return
  381. }
  382. s.Stats.Inc(UpdateFail)
  383. reportWriteFailure(Update)
  384. }()
  385. nodePath = path.Clean(path.Join("/", nodePath))
  386. // we do not allow the user to change "/"
  387. if s.readonlySet.Contains(nodePath) {
  388. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  389. }
  390. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  391. n, err := s.internalGet(nodePath)
  392. if err != nil { // if the node does not exist, return error
  393. return nil, err
  394. }
  395. if n.IsDir() && len(newValue) != 0 {
  396. // if the node is a directory, we cannot update value to non-empty
  397. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  398. }
  399. e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
  400. e.EtcdIndex = nextIndex
  401. e.PrevNode = n.Repr(false, false, s.clock)
  402. eNode := e.Node
  403. n.Write(newValue, nextIndex)
  404. if n.IsDir() {
  405. eNode.Dir = true
  406. } else {
  407. // copy the value for safety
  408. newValueCopy := newValue
  409. eNode.Value = &newValueCopy
  410. }
  411. // update ttl
  412. n.UpdateTTL(expireTime)
  413. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  414. s.WatcherHub.notify(e)
  415. s.CurrentIndex = nextIndex
  416. return e, nil
  417. }
  418. func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
  419. expireTime time.Time, action string) (*Event, *etcdErr.Error) {
  420. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  421. if unique { // append unique item under the node path
  422. nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
  423. }
  424. nodePath = path.Clean(path.Join("/", nodePath))
  425. // we do not allow the user to change "/"
  426. if s.readonlySet.Contains(nodePath) {
  427. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
  428. }
  429. // Assume expire times that are way in the past are
  430. // This can occur when the time is serialized to JS
  431. if expireTime.Before(minExpireTime) {
  432. expireTime = Permanent
  433. }
  434. dirName, nodeName := path.Split(nodePath)
  435. // walk through the nodePath, create dirs and get the last directory node
  436. d, err := s.walk(dirName, s.checkDir)
  437. if err != nil {
  438. s.Stats.Inc(SetFail)
  439. reportWriteFailure(action)
  440. err.Index = currIndex
  441. return nil, err
  442. }
  443. e := newEvent(action, nodePath, nextIndex, nextIndex)
  444. eNode := e.Node
  445. n, _ := d.GetChild(nodeName)
  446. // force will try to replace an existing file
  447. if n != nil {
  448. if replace {
  449. if n.IsDir() {
  450. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  451. }
  452. e.PrevNode = n.Repr(false, false, s.clock)
  453. n.Remove(false, false, nil)
  454. } else {
  455. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
  456. }
  457. }
  458. if !dir { // create file
  459. // copy the value for safety
  460. valueCopy := value
  461. eNode.Value = &valueCopy
  462. n = newKV(s, nodePath, value, nextIndex, d, expireTime)
  463. } else { // create directory
  464. eNode.Dir = true
  465. n = newDir(s, nodePath, nextIndex, d, expireTime)
  466. }
  467. // we are sure d is a directory and does not have the children with name n.Name
  468. d.Add(n)
  469. // node with TTL
  470. if !n.IsPermanent() {
  471. s.ttlKeyHeap.push(n)
  472. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  473. }
  474. s.CurrentIndex = nextIndex
  475. return e, nil
  476. }
  477. // InternalGet gets the node of the given nodePath.
  478. func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
  479. nodePath = path.Clean(path.Join("/", nodePath))
  480. walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
  481. if !parent.IsDir() {
  482. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  483. return nil, err
  484. }
  485. child, ok := parent.Children[name]
  486. if ok {
  487. return child, nil
  488. }
  489. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
  490. }
  491. f, err := s.walk(nodePath, walkFunc)
  492. if err != nil {
  493. return nil, err
  494. }
  495. return f, nil
  496. }
  497. // deleteExpiredKyes will delete all
  498. func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  499. s.worldLock.Lock()
  500. defer s.worldLock.Unlock()
  501. for {
  502. node := s.ttlKeyHeap.top()
  503. if node == nil || node.ExpireTime.After(cutoff) {
  504. break
  505. }
  506. s.CurrentIndex++
  507. e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
  508. e.EtcdIndex = s.CurrentIndex
  509. e.PrevNode = node.Repr(false, false, s.clock)
  510. callback := func(path string) { // notify function
  511. // notify the watchers with deleted set true
  512. s.WatcherHub.notifyWatchers(e, path, true)
  513. }
  514. s.ttlKeyHeap.pop()
  515. node.Remove(true, true, callback)
  516. reportExpiredKey()
  517. s.Stats.Inc(ExpireCount)
  518. s.WatcherHub.notify(e)
  519. }
  520. }
  521. // checkDir will check whether the component is a directory under parent node.
  522. // If it is a directory, this function will return the pointer to that node.
  523. // If it does not exist, this function will create a new directory and return the pointer to that node.
  524. // If it is a file, this function will return error.
  525. func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
  526. node, ok := parent.Children[dirName]
  527. if ok {
  528. if node.IsDir() {
  529. return node, nil
  530. }
  531. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
  532. }
  533. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)
  534. parent.Children[dirName] = n
  535. return n, nil
  536. }
  537. // Save saves the static state of the store system.
  538. // It will not be able to save the state of watchers.
  539. // It will not save the parent field of the node. Or there will
  540. // be cyclic dependencies issue for the json package.
  541. func (s *store) Save() ([]byte, error) {
  542. b, err := json.Marshal(s.Clone())
  543. if err != nil {
  544. return nil, err
  545. }
  546. return b, nil
  547. }
  548. func (s *store) SaveNoCopy() ([]byte, error) {
  549. b, err := json.Marshal(s)
  550. if err != nil {
  551. return nil, err
  552. }
  553. return b, nil
  554. }
  555. func (s *store) Clone() Store {
  556. s.worldLock.Lock()
  557. clonedStore := newStore()
  558. clonedStore.CurrentIndex = s.CurrentIndex
  559. clonedStore.Root = s.Root.Clone()
  560. clonedStore.WatcherHub = s.WatcherHub.clone()
  561. clonedStore.Stats = s.Stats.clone()
  562. clonedStore.CurrentVersion = s.CurrentVersion
  563. s.worldLock.Unlock()
  564. return clonedStore
  565. }
  566. // Recovery recovers the store system from a static state
  567. // It needs to recover the parent field of the nodes.
  568. // It needs to delete the expired nodes since the saved time and also
  569. // needs to create monitoring go routines.
  570. func (s *store) Recovery(state []byte) error {
  571. s.worldLock.Lock()
  572. defer s.worldLock.Unlock()
  573. err := json.Unmarshal(state, s)
  574. if err != nil {
  575. return err
  576. }
  577. s.ttlKeyHeap = newTtlKeyHeap()
  578. s.Root.recoverAndclean()
  579. return nil
  580. }
  581. func (s *store) JsonStats() []byte {
  582. s.Stats.Watchers = uint64(s.WatcherHub.count)
  583. return s.Stats.toJson()
  584. }