store.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package store
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "path"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  24. etcdErr "github.com/coreos/etcd/error"
  25. "github.com/coreos/etcd/pkg/types"
  26. )
  27. // The default version to set when the store is first initialized.
  28. const defaultVersion = 2
  29. var minExpireTime time.Time
  30. func init() {
  31. minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
  32. }
  33. type Store interface {
  34. Version() int
  35. Index() uint64
  36. Get(nodePath string, recursive, sorted bool) (*Event, error)
  37. Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
  38. Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
  39. Create(nodePath string, dir bool, value string, unique bool,
  40. expireTime time.Time) (*Event, error)
  41. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  42. value string, expireTime time.Time) (*Event, error)
  43. Delete(nodePath string, dir, recursive bool) (*Event, error)
  44. CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  45. Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
  46. Save() ([]byte, error)
  47. Recovery(state []byte) error
  48. JsonStats() []byte
  49. DeleteExpiredKeys(cutoff time.Time)
  50. }
  51. type store struct {
  52. Root *node
  53. WatcherHub *watcherHub
  54. CurrentIndex uint64
  55. Stats *Stats
  56. CurrentVersion int
  57. ttlKeyHeap *ttlKeyHeap // need to recovery manually
  58. worldLock sync.RWMutex // stop the world lock
  59. clock clockwork.Clock
  60. readonlySet types.Set
  61. }
  62. // The given namespaces will be created as initial directories in the returned store.
  63. func New(namespaces ...string) Store {
  64. s := newStore(namespaces...)
  65. s.clock = clockwork.NewRealClock()
  66. return s
  67. }
  68. func newStore(namespaces ...string) *store {
  69. s := new(store)
  70. s.CurrentVersion = defaultVersion
  71. s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
  72. for _, namespace := range namespaces {
  73. s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, "", Permanent))
  74. }
  75. s.Stats = newStats()
  76. s.WatcherHub = newWatchHub(1000)
  77. s.ttlKeyHeap = newTtlKeyHeap()
  78. s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...)
  79. return s
  80. }
  81. // Version retrieves current version of the store.
  82. func (s *store) Version() int {
  83. return s.CurrentVersion
  84. }
  85. // Retrieves current of the store
  86. func (s *store) Index() uint64 {
  87. s.worldLock.RLock()
  88. defer s.worldLock.RUnlock()
  89. return s.CurrentIndex
  90. }
  91. // Get returns a get event.
  92. // If recursive is true, it will return all the content under the node path.
  93. // If sorted is true, it will sort the content by keys.
  94. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
  95. s.worldLock.RLock()
  96. defer s.worldLock.RUnlock()
  97. nodePath = path.Clean(path.Join("/", nodePath))
  98. n, err := s.internalGet(nodePath)
  99. if err != nil {
  100. s.Stats.Inc(GetFail)
  101. return nil, err
  102. }
  103. e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  104. e.EtcdIndex = s.CurrentIndex
  105. e.Node.loadInternalNode(n, recursive, sorted, s.clock)
  106. s.Stats.Inc(GetSuccess)
  107. return e, nil
  108. }
  109. // Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
  110. // If the node has already existed, create will fail.
  111. // If any node on the path is a file, create will fail.
  112. func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
  113. s.worldLock.Lock()
  114. defer s.worldLock.Unlock()
  115. e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
  116. if err == nil {
  117. e.EtcdIndex = s.CurrentIndex
  118. s.WatcherHub.notify(e)
  119. s.Stats.Inc(CreateSuccess)
  120. } else {
  121. s.Stats.Inc(CreateFail)
  122. }
  123. return e, err
  124. }
  125. // Set creates or replace the node at nodePath.
  126. func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
  127. var err error
  128. s.worldLock.Lock()
  129. defer s.worldLock.Unlock()
  130. defer func() {
  131. if err == nil {
  132. s.Stats.Inc(SetSuccess)
  133. } else {
  134. s.Stats.Inc(SetFail)
  135. }
  136. }()
  137. // Get prevNode value
  138. n, getErr := s.internalGet(nodePath)
  139. if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
  140. err = getErr
  141. return nil, err
  142. }
  143. // Set new value
  144. e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
  145. if err != nil {
  146. return nil, err
  147. }
  148. e.EtcdIndex = s.CurrentIndex
  149. // Put prevNode into event
  150. if getErr == nil {
  151. prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  152. prev.Node.loadInternalNode(n, false, false, s.clock)
  153. e.PrevNode = prev.Node
  154. }
  155. s.WatcherHub.notify(e)
  156. return e, nil
  157. }
  158. // returns user-readable cause of failed comparison
  159. func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
  160. switch which {
  161. case CompareIndexNotMatch:
  162. return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
  163. case CompareValueNotMatch:
  164. return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
  165. default:
  166. return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  167. }
  168. }
  169. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  170. value string, expireTime time.Time) (*Event, error) {
  171. s.worldLock.Lock()
  172. defer s.worldLock.Unlock()
  173. nodePath = path.Clean(path.Join("/", nodePath))
  174. // we do not allow the user to change "/"
  175. if s.readonlySet.Contains(nodePath) {
  176. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  177. }
  178. n, err := s.internalGet(nodePath)
  179. if err != nil {
  180. s.Stats.Inc(CompareAndSwapFail)
  181. return nil, err
  182. }
  183. if n.IsDir() { // can only compare and swap file
  184. s.Stats.Inc(CompareAndSwapFail)
  185. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  186. }
  187. // If both of the prevValue and prevIndex are given, we will test both of them.
  188. // Command will be executed, only if both of the tests are successful.
  189. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  190. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  191. s.Stats.Inc(CompareAndSwapFail)
  192. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  193. }
  194. // update etcd index
  195. s.CurrentIndex++
  196. e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
  197. e.EtcdIndex = s.CurrentIndex
  198. e.PrevNode = n.Repr(false, false, s.clock)
  199. eNode := e.Node
  200. // if test succeed, write the value
  201. n.Write(value, s.CurrentIndex)
  202. n.UpdateTTL(expireTime)
  203. // copy the value for safety
  204. valueCopy := value
  205. eNode.Value = &valueCopy
  206. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  207. s.WatcherHub.notify(e)
  208. s.Stats.Inc(CompareAndSwapSuccess)
  209. return e, nil
  210. }
  211. // Delete deletes the node at the given path.
  212. // If the node is a directory, recursive must be true to delete it.
  213. func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
  214. s.worldLock.Lock()
  215. defer s.worldLock.Unlock()
  216. nodePath = path.Clean(path.Join("/", nodePath))
  217. // we do not allow the user to change "/"
  218. if s.readonlySet.Contains(nodePath) {
  219. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  220. }
  221. // recursive implies dir
  222. if recursive == true {
  223. dir = true
  224. }
  225. n, err := s.internalGet(nodePath)
  226. if err != nil { // if the node does not exist, return error
  227. s.Stats.Inc(DeleteFail)
  228. return nil, err
  229. }
  230. nextIndex := s.CurrentIndex + 1
  231. e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
  232. e.EtcdIndex = nextIndex
  233. e.PrevNode = n.Repr(false, false, s.clock)
  234. eNode := e.Node
  235. if n.IsDir() {
  236. eNode.Dir = true
  237. }
  238. callback := func(path string) { // notify function
  239. // notify the watchers with deleted set true
  240. s.WatcherHub.notifyWatchers(e, path, true)
  241. }
  242. err = n.Remove(dir, recursive, callback)
  243. if err != nil {
  244. s.Stats.Inc(DeleteFail)
  245. return nil, err
  246. }
  247. // update etcd index
  248. s.CurrentIndex++
  249. s.WatcherHub.notify(e)
  250. s.Stats.Inc(DeleteSuccess)
  251. return e, nil
  252. }
  253. func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
  254. nodePath = path.Clean(path.Join("/", nodePath))
  255. s.worldLock.Lock()
  256. defer s.worldLock.Unlock()
  257. n, err := s.internalGet(nodePath)
  258. if err != nil { // if the node does not exist, return error
  259. s.Stats.Inc(CompareAndDeleteFail)
  260. return nil, err
  261. }
  262. if n.IsDir() { // can only compare and delete file
  263. s.Stats.Inc(CompareAndSwapFail)
  264. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  265. }
  266. // If both of the prevValue and prevIndex are given, we will test both of them.
  267. // Command will be executed, only if both of the tests are successful.
  268. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  269. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  270. s.Stats.Inc(CompareAndDeleteFail)
  271. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  272. }
  273. // update etcd index
  274. s.CurrentIndex++
  275. e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
  276. e.EtcdIndex = s.CurrentIndex
  277. e.PrevNode = n.Repr(false, false, s.clock)
  278. callback := func(path string) { // notify function
  279. // notify the watchers with deleted set true
  280. s.WatcherHub.notifyWatchers(e, path, true)
  281. }
  282. // delete a key-value pair, no error should happen
  283. n.Remove(false, false, callback)
  284. s.WatcherHub.notify(e)
  285. s.Stats.Inc(CompareAndDeleteSuccess)
  286. return e, nil
  287. }
  288. func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
  289. s.worldLock.RLock()
  290. defer s.worldLock.RUnlock()
  291. key = path.Clean(path.Join("/", key))
  292. if sinceIndex == 0 {
  293. sinceIndex = s.CurrentIndex + 1
  294. }
  295. // WatchHub does not know about the current index, so we need to pass it in
  296. w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
  297. if err != nil {
  298. return nil, err
  299. }
  300. return w, nil
  301. }
  302. // walk walks all the nodePath and apply the walkFunc on each directory
  303. func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
  304. components := strings.Split(nodePath, "/")
  305. curr := s.Root
  306. var err *etcdErr.Error
  307. for i := 1; i < len(components); i++ {
  308. if len(components[i]) == 0 { // ignore empty string
  309. return curr, nil
  310. }
  311. curr, err = walkFunc(curr, components[i])
  312. if err != nil {
  313. return nil, err
  314. }
  315. }
  316. return curr, nil
  317. }
  318. // Update updates the value/ttl of the node.
  319. // If the node is a file, the value and the ttl can be updated.
  320. // If the node is a directory, only the ttl can be updated.
  321. func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
  322. s.worldLock.Lock()
  323. defer s.worldLock.Unlock()
  324. nodePath = path.Clean(path.Join("/", nodePath))
  325. // we do not allow the user to change "/"
  326. if s.readonlySet.Contains(nodePath) {
  327. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  328. }
  329. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  330. n, err := s.internalGet(nodePath)
  331. if err != nil { // if the node does not exist, return error
  332. s.Stats.Inc(UpdateFail)
  333. return nil, err
  334. }
  335. e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
  336. e.EtcdIndex = nextIndex
  337. e.PrevNode = n.Repr(false, false, s.clock)
  338. eNode := e.Node
  339. if n.IsDir() && len(newValue) != 0 {
  340. // if the node is a directory, we cannot update value to non-empty
  341. s.Stats.Inc(UpdateFail)
  342. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  343. }
  344. n.Write(newValue, nextIndex)
  345. if n.IsDir() {
  346. eNode.Dir = true
  347. } else {
  348. // copy the value for safety
  349. newValueCopy := newValue
  350. eNode.Value = &newValueCopy
  351. }
  352. // update ttl
  353. n.UpdateTTL(expireTime)
  354. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  355. s.WatcherHub.notify(e)
  356. s.Stats.Inc(UpdateSuccess)
  357. s.CurrentIndex = nextIndex
  358. return e, nil
  359. }
  360. func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
  361. expireTime time.Time, action string) (*Event, error) {
  362. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  363. if unique { // append unique item under the node path
  364. nodePath += "/" + strconv.FormatUint(nextIndex, 10)
  365. }
  366. nodePath = path.Clean(path.Join("/", nodePath))
  367. // we do not allow the user to change "/"
  368. if s.readonlySet.Contains(nodePath) {
  369. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
  370. }
  371. // Assume expire times that are way in the past are
  372. // This can occur when the time is serialized to JS
  373. if expireTime.Before(minExpireTime) {
  374. expireTime = Permanent
  375. }
  376. dirName, nodeName := path.Split(nodePath)
  377. // walk through the nodePath, create dirs and get the last directory node
  378. d, err := s.walk(dirName, s.checkDir)
  379. if err != nil {
  380. s.Stats.Inc(SetFail)
  381. err.Index = currIndex
  382. return nil, err
  383. }
  384. e := newEvent(action, nodePath, nextIndex, nextIndex)
  385. eNode := e.Node
  386. n, _ := d.GetChild(nodeName)
  387. // force will try to replace a existing file
  388. if n != nil {
  389. if replace {
  390. if n.IsDir() {
  391. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  392. }
  393. e.PrevNode = n.Repr(false, false, s.clock)
  394. n.Remove(false, false, nil)
  395. } else {
  396. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
  397. }
  398. }
  399. if !dir { // create file
  400. // copy the value for safety
  401. valueCopy := value
  402. eNode.Value = &valueCopy
  403. n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
  404. } else { // create directory
  405. eNode.Dir = true
  406. n = newDir(s, nodePath, nextIndex, d, "", expireTime)
  407. }
  408. // we are sure d is a directory and does not have the children with name n.Name
  409. d.Add(n)
  410. // node with TTL
  411. if !n.IsPermanent() {
  412. s.ttlKeyHeap.push(n)
  413. eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
  414. }
  415. s.CurrentIndex = nextIndex
  416. return e, nil
  417. }
  418. // InternalGet gets the node of the given nodePath.
  419. func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
  420. nodePath = path.Clean(path.Join("/", nodePath))
  421. walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
  422. if !parent.IsDir() {
  423. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  424. return nil, err
  425. }
  426. child, ok := parent.Children[name]
  427. if ok {
  428. return child, nil
  429. }
  430. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
  431. }
  432. f, err := s.walk(nodePath, walkFunc)
  433. if err != nil {
  434. return nil, err
  435. }
  436. return f, nil
  437. }
  438. // deleteExpiredKyes will delete all
  439. func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  440. s.worldLock.Lock()
  441. defer s.worldLock.Unlock()
  442. for {
  443. node := s.ttlKeyHeap.top()
  444. if node == nil || node.ExpireTime.After(cutoff) {
  445. break
  446. }
  447. s.CurrentIndex++
  448. e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
  449. e.EtcdIndex = s.CurrentIndex
  450. e.PrevNode = node.Repr(false, false, s.clock)
  451. callback := func(path string) { // notify function
  452. // notify the watchers with deleted set true
  453. s.WatcherHub.notifyWatchers(e, path, true)
  454. }
  455. s.ttlKeyHeap.pop()
  456. node.Remove(true, true, callback)
  457. s.Stats.Inc(ExpireCount)
  458. s.WatcherHub.notify(e)
  459. }
  460. }
  461. // checkDir will check whether the component is a directory under parent node.
  462. // If it is a directory, this function will return the pointer to that node.
  463. // If it does not exist, this function will create a new directory and return the pointer to that node.
  464. // If it is a file, this function will return error.
  465. func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
  466. node, ok := parent.Children[dirName]
  467. if ok {
  468. if node.IsDir() {
  469. return node, nil
  470. }
  471. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
  472. }
  473. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
  474. parent.Children[dirName] = n
  475. return n, nil
  476. }
  477. // Save saves the static state of the store system.
  478. // It will not be able to save the state of watchers.
  479. // It will not save the parent field of the node. Or there will
  480. // be cyclic dependencies issue for the json package.
  481. func (s *store) Save() ([]byte, error) {
  482. s.worldLock.Lock()
  483. clonedStore := newStore()
  484. clonedStore.CurrentIndex = s.CurrentIndex
  485. clonedStore.Root = s.Root.Clone()
  486. clonedStore.WatcherHub = s.WatcherHub.clone()
  487. clonedStore.Stats = s.Stats.clone()
  488. clonedStore.CurrentVersion = s.CurrentVersion
  489. s.worldLock.Unlock()
  490. b, err := json.Marshal(clonedStore)
  491. if err != nil {
  492. return nil, err
  493. }
  494. return b, nil
  495. }
  496. // Recovery recovers the store system from a static state
  497. // It needs to recover the parent field of the nodes.
  498. // It needs to delete the expired nodes since the saved time and also
  499. // needs to create monitoring go routines.
  500. func (s *store) Recovery(state []byte) error {
  501. s.worldLock.Lock()
  502. defer s.worldLock.Unlock()
  503. err := json.Unmarshal(state, s)
  504. if err != nil {
  505. return err
  506. }
  507. s.ttlKeyHeap = newTtlKeyHeap()
  508. s.Root.recoverAndclean()
  509. return nil
  510. }
  511. func (s *store) JsonStats() []byte {
  512. s.Stats.Watchers = uint64(s.WatcherHub.count)
  513. return s.Stats.toJson()
  514. }