store.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "path"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "time"
  22. etcdErr "github.com/coreos/etcd/error"
  23. ustrings "github.com/coreos/etcd/pkg/strings"
  24. )
  25. // The default version to set when the store is first initialized.
  26. const defaultVersion = 2
  27. var minExpireTime time.Time
  28. func init() {
  29. minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
  30. }
  31. type Store interface {
  32. Version() int
  33. CommandFactory() CommandFactory
  34. Index() uint64
  35. Get(nodePath string, recursive, sorted bool) (*Event, error)
  36. Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
  37. Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
  38. Create(nodePath string, dir bool, value string, unique bool,
  39. expireTime time.Time) (*Event, error)
  40. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  41. value string, expireTime time.Time) (*Event, error)
  42. Delete(nodePath string, recursive, dir bool) (*Event, error)
  43. CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  44. Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error)
  45. Save() ([]byte, error)
  46. Recovery(state []byte) error
  47. TotalTransactions() uint64
  48. JsonStats() []byte
  49. DeleteExpiredKeys(cutoff time.Time)
  50. }
  51. type store struct {
  52. Root *node
  53. WatcherHub *watcherHub
  54. CurrentIndex uint64
  55. Stats *Stats
  56. CurrentVersion int
  57. ttlKeyHeap *ttlKeyHeap // need to recovery manually
  58. worldLock sync.RWMutex // stop the world lock
  59. }
  60. func New() Store {
  61. return newStore()
  62. }
  63. func newStore() *store {
  64. s := new(store)
  65. s.CurrentVersion = defaultVersion
  66. s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
  67. s.Stats = newStats()
  68. s.WatcherHub = newWatchHub(1000)
  69. s.ttlKeyHeap = newTtlKeyHeap()
  70. return s
  71. }
  72. // Version retrieves current version of the store.
  73. func (s *store) Version() int {
  74. return s.CurrentVersion
  75. }
  76. // Retrieves current of the store
  77. func (s *store) Index() uint64 {
  78. s.worldLock.RLock()
  79. defer s.worldLock.RUnlock()
  80. return s.CurrentIndex
  81. }
  82. // CommandFactory retrieves the command factory for the current version of the store.
  83. func (s *store) CommandFactory() CommandFactory {
  84. return GetCommandFactory(s.Version())
  85. }
  86. // Get returns a get event.
  87. // If recursive is true, it will return all the content under the node path.
  88. // If sorted is true, it will sort the content by keys.
  89. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
  90. s.worldLock.RLock()
  91. defer s.worldLock.RUnlock()
  92. nodePath = path.Clean(path.Join("/", nodePath))
  93. n, err := s.internalGet(nodePath)
  94. if err != nil {
  95. s.Stats.Inc(GetFail)
  96. return nil, err
  97. }
  98. e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  99. e.Node.loadInternalNode(n, recursive, sorted)
  100. s.Stats.Inc(GetSuccess)
  101. return e, nil
  102. }
  103. // Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
  104. // If the node has already existed, create will fail.
  105. // If any node on the path is a file, create will fail.
  106. func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
  107. s.worldLock.Lock()
  108. defer s.worldLock.Unlock()
  109. e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
  110. if err == nil {
  111. s.WatcherHub.notify(e)
  112. s.Stats.Inc(CreateSuccess)
  113. } else {
  114. s.Stats.Inc(CreateFail)
  115. }
  116. return e, err
  117. }
  118. // Set creates or replace the node at nodePath.
  119. func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
  120. var err error
  121. s.worldLock.Lock()
  122. defer s.worldLock.Unlock()
  123. defer func() {
  124. if err == nil {
  125. s.Stats.Inc(SetSuccess)
  126. } else {
  127. s.Stats.Inc(SetFail)
  128. }
  129. }()
  130. // Get prevNode value
  131. n, getErr := s.internalGet(nodePath)
  132. if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
  133. err = getErr
  134. return nil, err
  135. }
  136. // Set new value
  137. e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
  138. if err != nil {
  139. return nil, err
  140. }
  141. // Put prevNode into event
  142. if getErr == nil {
  143. prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  144. prev.Node.loadInternalNode(n, false, false)
  145. e.PrevNode = prev.Node
  146. }
  147. s.WatcherHub.notify(e)
  148. return e, nil
  149. }
  150. // returns user-readable cause of failed comparison
  151. func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
  152. switch which {
  153. case CompareIndexNotMatch:
  154. return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
  155. case CompareValueNotMatch:
  156. return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
  157. default:
  158. return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  159. }
  160. }
  161. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  162. value string, expireTime time.Time) (*Event, error) {
  163. s.worldLock.Lock()
  164. defer s.worldLock.Unlock()
  165. nodePath = path.Clean(path.Join("/", nodePath))
  166. // we do not allow the user to change "/"
  167. if nodePath == "/" {
  168. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  169. }
  170. n, err := s.internalGet(nodePath)
  171. if err != nil {
  172. s.Stats.Inc(CompareAndSwapFail)
  173. return nil, err
  174. }
  175. if n.IsDir() { // can only compare and swap file
  176. s.Stats.Inc(CompareAndSwapFail)
  177. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  178. }
  179. // If both of the prevValue and prevIndex are given, we will test both of them.
  180. // Command will be executed, only if both of the tests are successful.
  181. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  182. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  183. s.Stats.Inc(CompareAndSwapFail)
  184. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  185. }
  186. // update etcd index
  187. s.CurrentIndex++
  188. e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
  189. e.PrevNode = n.Repr(false, false)
  190. eNode := e.Node
  191. // if test succeed, write the value
  192. n.Write(value, s.CurrentIndex)
  193. n.UpdateTTL(expireTime)
  194. // copy the value for safety
  195. valueCopy := ustrings.Clone(value)
  196. eNode.Value = &valueCopy
  197. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  198. s.WatcherHub.notify(e)
  199. s.Stats.Inc(CompareAndSwapSuccess)
  200. return e, nil
  201. }
  202. // Delete deletes the node at the given path.
  203. // If the node is a directory, recursive must be true to delete it.
  204. func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
  205. s.worldLock.Lock()
  206. defer s.worldLock.Unlock()
  207. nodePath = path.Clean(path.Join("/", nodePath))
  208. // we do not allow the user to change "/"
  209. if nodePath == "/" {
  210. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  211. }
  212. // recursive implies dir
  213. if recursive == true {
  214. dir = true
  215. }
  216. n, err := s.internalGet(nodePath)
  217. if err != nil { // if the node does not exist, return error
  218. s.Stats.Inc(DeleteFail)
  219. return nil, err
  220. }
  221. nextIndex := s.CurrentIndex + 1
  222. e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
  223. e.PrevNode = n.Repr(false, false)
  224. eNode := e.Node
  225. if n.IsDir() {
  226. eNode.Dir = true
  227. }
  228. callback := func(path string) { // notify function
  229. // notify the watchers with deleted set true
  230. s.WatcherHub.notifyWatchers(e, path, true)
  231. }
  232. err = n.Remove(dir, recursive, callback)
  233. if err != nil {
  234. s.Stats.Inc(DeleteFail)
  235. return nil, err
  236. }
  237. // update etcd index
  238. s.CurrentIndex++
  239. s.WatcherHub.notify(e)
  240. s.Stats.Inc(DeleteSuccess)
  241. return e, nil
  242. }
  243. func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
  244. nodePath = path.Clean(path.Join("/", nodePath))
  245. s.worldLock.Lock()
  246. defer s.worldLock.Unlock()
  247. n, err := s.internalGet(nodePath)
  248. if err != nil { // if the node does not exist, return error
  249. s.Stats.Inc(CompareAndDeleteFail)
  250. return nil, err
  251. }
  252. if n.IsDir() { // can only compare and delete file
  253. s.Stats.Inc(CompareAndSwapFail)
  254. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  255. }
  256. // If both of the prevValue and prevIndex are given, we will test both of them.
  257. // Command will be executed, only if both of the tests are successful.
  258. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  259. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  260. s.Stats.Inc(CompareAndDeleteFail)
  261. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  262. }
  263. // update etcd index
  264. s.CurrentIndex++
  265. e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
  266. e.PrevNode = n.Repr(false, false)
  267. callback := func(path string) { // notify function
  268. // notify the watchers with deleted set true
  269. s.WatcherHub.notifyWatchers(e, path, true)
  270. }
  271. // delete a key-value pair, no error should happen
  272. n.Remove(false, false, callback)
  273. s.WatcherHub.notify(e)
  274. s.Stats.Inc(CompareAndDeleteSuccess)
  275. return e, nil
  276. }
  277. func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) {
  278. s.worldLock.RLock()
  279. defer s.worldLock.RUnlock()
  280. key = path.Clean(path.Join("/", key))
  281. nextIndex := s.CurrentIndex + 1
  282. var w *Watcher
  283. var err *etcdErr.Error
  284. if sinceIndex == 0 {
  285. w, err = s.WatcherHub.watch(key, recursive, stream, nextIndex)
  286. } else {
  287. w, err = s.WatcherHub.watch(key, recursive, stream, sinceIndex)
  288. }
  289. if err != nil {
  290. // watchhub do not know the current Index
  291. // we need to attach the currentIndex here
  292. err.Index = s.CurrentIndex
  293. return nil, err
  294. }
  295. return w, nil
  296. }
  297. // walk walks all the nodePath and apply the walkFunc on each directory
  298. func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
  299. components := strings.Split(nodePath, "/")
  300. curr := s.Root
  301. var err *etcdErr.Error
  302. for i := 1; i < len(components); i++ {
  303. if len(components[i]) == 0 { // ignore empty string
  304. return curr, nil
  305. }
  306. curr, err = walkFunc(curr, components[i])
  307. if err != nil {
  308. return nil, err
  309. }
  310. }
  311. return curr, nil
  312. }
  313. // Update updates the value/ttl of the node.
  314. // If the node is a file, the value and the ttl can be updated.
  315. // If the node is a directory, only the ttl can be updated.
  316. func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
  317. s.worldLock.Lock()
  318. defer s.worldLock.Unlock()
  319. nodePath = path.Clean(path.Join("/", nodePath))
  320. // we do not allow the user to change "/"
  321. if nodePath == "/" {
  322. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  323. }
  324. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  325. n, err := s.internalGet(nodePath)
  326. if err != nil { // if the node does not exist, return error
  327. s.Stats.Inc(UpdateFail)
  328. return nil, err
  329. }
  330. e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
  331. e.PrevNode = n.Repr(false, false)
  332. eNode := e.Node
  333. if n.IsDir() && len(newValue) != 0 {
  334. // if the node is a directory, we cannot update value to non-empty
  335. s.Stats.Inc(UpdateFail)
  336. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  337. }
  338. n.Write(newValue, nextIndex)
  339. if n.IsDir() {
  340. eNode.Dir = true
  341. } else {
  342. // copy the value for safety
  343. newValueCopy := ustrings.Clone(newValue)
  344. eNode.Value = &newValueCopy
  345. }
  346. // update ttl
  347. n.UpdateTTL(expireTime)
  348. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  349. s.WatcherHub.notify(e)
  350. s.Stats.Inc(UpdateSuccess)
  351. s.CurrentIndex = nextIndex
  352. return e, nil
  353. }
  354. func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
  355. expireTime time.Time, action string) (*Event, error) {
  356. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  357. if unique { // append unique item under the node path
  358. nodePath += "/" + strconv.FormatUint(nextIndex, 10)
  359. }
  360. nodePath = path.Clean(path.Join("/", nodePath))
  361. // we do not allow the user to change "/"
  362. if nodePath == "/" {
  363. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
  364. }
  365. // Assume expire times that are way in the past are not valid.
  366. // This can occur when the time is serialized to JSON and read back in.
  367. if expireTime.Before(minExpireTime) {
  368. expireTime = Permanent
  369. }
  370. dirName, nodeName := path.Split(nodePath)
  371. // walk through the nodePath, create dirs and get the last directory node
  372. d, err := s.walk(dirName, s.checkDir)
  373. if err != nil {
  374. s.Stats.Inc(SetFail)
  375. err.Index = currIndex
  376. return nil, err
  377. }
  378. e := newEvent(action, nodePath, nextIndex, nextIndex)
  379. eNode := e.Node
  380. n, _ := d.GetChild(nodeName)
  381. // force will try to replace a existing file
  382. if n != nil {
  383. if replace {
  384. if n.IsDir() {
  385. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  386. }
  387. e.PrevNode = n.Repr(false, false)
  388. n.Remove(false, false, nil)
  389. } else {
  390. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
  391. }
  392. }
  393. if !dir { // create file
  394. // copy the value for safety
  395. valueCopy := ustrings.Clone(value)
  396. eNode.Value = &valueCopy
  397. n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
  398. } else { // create directory
  399. eNode.Dir = true
  400. n = newDir(s, nodePath, nextIndex, d, "", expireTime)
  401. }
  402. // we are sure d is a directory and does not have the children with name n.Name
  403. d.Add(n)
  404. // node with TTL
  405. if !n.IsPermanent() {
  406. s.ttlKeyHeap.push(n)
  407. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  408. }
  409. s.CurrentIndex = nextIndex
  410. return e, nil
  411. }
  412. // InternalGet gets the node of the given nodePath.
  413. func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
  414. nodePath = path.Clean(path.Join("/", nodePath))
  415. walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
  416. if !parent.IsDir() {
  417. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  418. return nil, err
  419. }
  420. child, ok := parent.Children[name]
  421. if ok {
  422. return child, nil
  423. }
  424. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
  425. }
  426. f, err := s.walk(nodePath, walkFunc)
  427. if err != nil {
  428. return nil, err
  429. }
  430. return f, nil
  431. }
  432. // deleteExpiredKyes will delete all
  433. func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  434. s.worldLock.Lock()
  435. defer s.worldLock.Unlock()
  436. for {
  437. node := s.ttlKeyHeap.top()
  438. if node == nil || node.ExpireTime.After(cutoff) {
  439. break
  440. }
  441. s.CurrentIndex++
  442. e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
  443. e.PrevNode = node.Repr(false, false)
  444. callback := func(path string) { // notify function
  445. // notify the watchers with deleted set true
  446. s.WatcherHub.notifyWatchers(e, path, true)
  447. }
  448. s.ttlKeyHeap.pop()
  449. node.Remove(true, true, callback)
  450. s.Stats.Inc(ExpireCount)
  451. s.WatcherHub.notify(e)
  452. }
  453. }
  454. // checkDir will check whether the component is a directory under parent node.
  455. // If it is a directory, this function will return the pointer to that node.
  456. // If it does not exist, this function will create a new directory and return the pointer to that node.
  457. // If it is a file, this function will return error.
  458. func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
  459. node, ok := parent.Children[dirName]
  460. if ok {
  461. if node.IsDir() {
  462. return node, nil
  463. }
  464. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
  465. }
  466. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
  467. parent.Children[dirName] = n
  468. return n, nil
  469. }
  470. // Save saves the static state of the store system.
  471. // It will not be able to save the state of watchers.
  472. // It will not save the parent field of the node. Or there will
  473. // be cyclic dependencies issue for the json package.
  474. func (s *store) Save() ([]byte, error) {
  475. s.worldLock.Lock()
  476. clonedStore := newStore()
  477. clonedStore.CurrentIndex = s.CurrentIndex
  478. clonedStore.Root = s.Root.Clone()
  479. clonedStore.WatcherHub = s.WatcherHub.clone()
  480. clonedStore.Stats = s.Stats.clone()
  481. clonedStore.CurrentVersion = s.CurrentVersion
  482. s.worldLock.Unlock()
  483. b, err := json.Marshal(clonedStore)
  484. if err != nil {
  485. return nil, err
  486. }
  487. return b, nil
  488. }
  489. // Recovery recovers the store system from a static state
  490. // It needs to recover the parent field of the nodes.
  491. // It needs to delete the expired nodes since the saved time and also
  492. // needs to create monitoring go routines.
  493. func (s *store) Recovery(state []byte) error {
  494. s.worldLock.Lock()
  495. defer s.worldLock.Unlock()
  496. err := json.Unmarshal(state, s)
  497. if err != nil {
  498. return err
  499. }
  500. s.ttlKeyHeap = newTtlKeyHeap()
  501. s.Root.recoverAndclean()
  502. return nil
  503. }
  504. func (s *store) JsonStats() []byte {
  505. s.Stats.Watchers = uint64(s.WatcherHub.count)
  506. return s.Stats.toJson()
  507. }
  508. func (s *store) TotalTransactions() uint64 {
  509. return s.Stats.TotalTranscations()
  510. }