store.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "path"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "time"
  22. etcdErr "github.com/coreos/etcd/error"
  23. )
  24. // The default version to set when the store is first initialized.
  25. const defaultVersion = 2
  26. var minExpireTime time.Time
  27. func init() {
  28. minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
  29. }
  30. type Store interface {
  31. Version() int
  32. Index() uint64
  33. Get(nodePath string, recursive, sorted bool) (*Event, error)
  34. Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
  35. Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
  36. Create(nodePath string, dir bool, value string, unique bool,
  37. expireTime time.Time) (*Event, error)
  38. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  39. value string, expireTime time.Time) (*Event, error)
  40. Delete(nodePath string, dir, recursive bool) (*Event, error)
  41. CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  42. Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
  43. Save() ([]byte, error)
  44. Recovery(state []byte) error
  45. TotalTransactions() uint64
  46. JsonStats() []byte
  47. DeleteExpiredKeys(cutoff time.Time)
  48. }
  49. type store struct {
  50. Root *node
  51. WatcherHub *watcherHub
  52. CurrentIndex uint64
  53. Stats *Stats
  54. CurrentVersion int
  55. ttlKeyHeap *ttlKeyHeap // need to recovery manually
  56. worldLock sync.RWMutex // stop the world lock
  57. }
  58. func New() Store {
  59. return newStore()
  60. }
  61. func newStore() *store {
  62. s := new(store)
  63. s.CurrentVersion = defaultVersion
  64. s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
  65. s.Stats = newStats()
  66. s.WatcherHub = newWatchHub(1000)
  67. s.ttlKeyHeap = newTtlKeyHeap()
  68. return s
  69. }
  70. // Version retrieves current version of the store.
  71. func (s *store) Version() int {
  72. return s.CurrentVersion
  73. }
  74. // Retrieves current of the store
  75. func (s *store) Index() uint64 {
  76. s.worldLock.RLock()
  77. defer s.worldLock.RUnlock()
  78. return s.CurrentIndex
  79. }
  80. // Get returns a get event.
  81. // If recursive is true, it will return all the content under the node path.
  82. // If sorted is true, it will sort the content by keys.
  83. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
  84. s.worldLock.RLock()
  85. defer s.worldLock.RUnlock()
  86. nodePath = path.Clean(path.Join("/", nodePath))
  87. n, err := s.internalGet(nodePath)
  88. if err != nil {
  89. s.Stats.Inc(GetFail)
  90. return nil, err
  91. }
  92. e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  93. e.EtcdIndex = s.CurrentIndex
  94. e.Node.loadInternalNode(n, recursive, sorted)
  95. s.Stats.Inc(GetSuccess)
  96. return e, nil
  97. }
  98. // Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
  99. // If the node has already existed, create will fail.
  100. // If any node on the path is a file, create will fail.
  101. func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
  102. s.worldLock.Lock()
  103. defer s.worldLock.Unlock()
  104. e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
  105. if err == nil {
  106. e.EtcdIndex = s.CurrentIndex
  107. s.WatcherHub.notify(e)
  108. s.Stats.Inc(CreateSuccess)
  109. } else {
  110. s.Stats.Inc(CreateFail)
  111. }
  112. return e, err
  113. }
  114. // Set creates or replace the node at nodePath.
  115. func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
  116. var err error
  117. s.worldLock.Lock()
  118. defer s.worldLock.Unlock()
  119. defer func() {
  120. if err == nil {
  121. s.Stats.Inc(SetSuccess)
  122. } else {
  123. s.Stats.Inc(SetFail)
  124. }
  125. }()
  126. // Get prevNode value
  127. n, getErr := s.internalGet(nodePath)
  128. if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
  129. err = getErr
  130. return nil, err
  131. }
  132. // Set new value
  133. e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
  134. if err != nil {
  135. return nil, err
  136. }
  137. e.EtcdIndex = s.CurrentIndex
  138. // Put prevNode into event
  139. if getErr == nil {
  140. prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  141. prev.Node.loadInternalNode(n, false, false)
  142. e.PrevNode = prev.Node
  143. }
  144. s.WatcherHub.notify(e)
  145. return e, nil
  146. }
  147. // returns user-readable cause of failed comparison
  148. func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
  149. switch which {
  150. case CompareIndexNotMatch:
  151. return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
  152. case CompareValueNotMatch:
  153. return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
  154. default:
  155. return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  156. }
  157. }
  158. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  159. value string, expireTime time.Time) (*Event, error) {
  160. s.worldLock.Lock()
  161. defer s.worldLock.Unlock()
  162. nodePath = path.Clean(path.Join("/", nodePath))
  163. // we do not allow the user to change "/"
  164. if nodePath == "/" {
  165. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  166. }
  167. n, err := s.internalGet(nodePath)
  168. if err != nil {
  169. s.Stats.Inc(CompareAndSwapFail)
  170. return nil, err
  171. }
  172. if n.IsDir() { // can only compare and swap file
  173. s.Stats.Inc(CompareAndSwapFail)
  174. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  175. }
  176. // If both of the prevValue and prevIndex are given, we will test both of them.
  177. // Command will be executed, only if both of the tests are successful.
  178. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  179. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  180. s.Stats.Inc(CompareAndSwapFail)
  181. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  182. }
  183. // update etcd index
  184. s.CurrentIndex++
  185. e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
  186. e.EtcdIndex = s.CurrentIndex
  187. e.PrevNode = n.Repr(false, false)
  188. eNode := e.Node
  189. // if test succeed, write the value
  190. n.Write(value, s.CurrentIndex)
  191. n.UpdateTTL(expireTime)
  192. // copy the value for safety
  193. valueCopy := value
  194. eNode.Value = &valueCopy
  195. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  196. s.WatcherHub.notify(e)
  197. s.Stats.Inc(CompareAndSwapSuccess)
  198. return e, nil
  199. }
  200. // Delete deletes the node at the given path.
  201. // If the node is a directory, recursive must be true to delete it.
  202. func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
  203. s.worldLock.Lock()
  204. defer s.worldLock.Unlock()
  205. nodePath = path.Clean(path.Join("/", nodePath))
  206. // we do not allow the user to change "/"
  207. if nodePath == "/" {
  208. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  209. }
  210. // recursive implies dir
  211. if recursive == true {
  212. dir = true
  213. }
  214. n, err := s.internalGet(nodePath)
  215. if err != nil { // if the node does not exist, return error
  216. s.Stats.Inc(DeleteFail)
  217. return nil, err
  218. }
  219. nextIndex := s.CurrentIndex + 1
  220. e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
  221. e.EtcdIndex = s.CurrentIndex
  222. e.PrevNode = n.Repr(false, false)
  223. eNode := e.Node
  224. if n.IsDir() {
  225. eNode.Dir = true
  226. }
  227. callback := func(path string) { // notify function
  228. // notify the watchers with deleted set true
  229. s.WatcherHub.notifyWatchers(e, path, true)
  230. }
  231. err = n.Remove(dir, recursive, callback)
  232. if err != nil {
  233. s.Stats.Inc(DeleteFail)
  234. return nil, err
  235. }
  236. // update etcd index
  237. s.CurrentIndex++
  238. s.WatcherHub.notify(e)
  239. s.Stats.Inc(DeleteSuccess)
  240. return e, nil
  241. }
  242. func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
  243. nodePath = path.Clean(path.Join("/", nodePath))
  244. s.worldLock.Lock()
  245. defer s.worldLock.Unlock()
  246. n, err := s.internalGet(nodePath)
  247. if err != nil { // if the node does not exist, return error
  248. s.Stats.Inc(CompareAndDeleteFail)
  249. return nil, err
  250. }
  251. if n.IsDir() { // can only compare and delete file
  252. s.Stats.Inc(CompareAndSwapFail)
  253. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  254. }
  255. // If both of the prevValue and prevIndex are given, we will test both of them.
  256. // Command will be executed, only if both of the tests are successful.
  257. if ok, which := n.Compare(prevValue, prevIndex); !ok {
  258. cause := getCompareFailCause(n, which, prevValue, prevIndex)
  259. s.Stats.Inc(CompareAndDeleteFail)
  260. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  261. }
  262. // update etcd index
  263. s.CurrentIndex++
  264. e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
  265. e.EtcdIndex = s.CurrentIndex
  266. e.PrevNode = n.Repr(false, false)
  267. callback := func(path string) { // notify function
  268. // notify the watchers with deleted set true
  269. s.WatcherHub.notifyWatchers(e, path, true)
  270. }
  271. // delete a key-value pair, no error should happen
  272. n.Remove(false, false, callback)
  273. s.WatcherHub.notify(e)
  274. s.Stats.Inc(CompareAndDeleteSuccess)
  275. return e, nil
  276. }
  277. func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
  278. s.worldLock.RLock()
  279. defer s.worldLock.RUnlock()
  280. key = path.Clean(path.Join("/", key))
  281. if sinceIndex == 0 {
  282. sinceIndex = s.CurrentIndex + 1
  283. }
  284. // WatchHub does not know about the current index, so we need to pass it in
  285. w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
  286. if err != nil {
  287. return nil, err
  288. }
  289. return w, nil
  290. }
  291. // walk walks all the nodePath and apply the walkFunc on each directory
  292. func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
  293. components := strings.Split(nodePath, "/")
  294. curr := s.Root
  295. var err *etcdErr.Error
  296. for i := 1; i < len(components); i++ {
  297. if len(components[i]) == 0 { // ignore empty string
  298. return curr, nil
  299. }
  300. curr, err = walkFunc(curr, components[i])
  301. if err != nil {
  302. return nil, err
  303. }
  304. }
  305. return curr, nil
  306. }
  307. // Update updates the value/ttl of the node.
  308. // If the node is a file, the value and the ttl can be updated.
  309. // If the node is a directory, only the ttl can be updated.
  310. func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
  311. s.worldLock.Lock()
  312. defer s.worldLock.Unlock()
  313. nodePath = path.Clean(path.Join("/", nodePath))
  314. // we do not allow the user to change "/"
  315. if nodePath == "/" {
  316. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  317. }
  318. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  319. n, err := s.internalGet(nodePath)
  320. if err != nil { // if the node does not exist, return error
  321. s.Stats.Inc(UpdateFail)
  322. return nil, err
  323. }
  324. e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
  325. e.EtcdIndex = s.CurrentIndex
  326. e.PrevNode = n.Repr(false, false)
  327. eNode := e.Node
  328. if n.IsDir() && len(newValue) != 0 {
  329. // if the node is a directory, we cannot update value to non-empty
  330. s.Stats.Inc(UpdateFail)
  331. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  332. }
  333. n.Write(newValue, nextIndex)
  334. if n.IsDir() {
  335. eNode.Dir = true
  336. } else {
  337. // copy the value for safety
  338. newValueCopy := newValue
  339. eNode.Value = &newValueCopy
  340. }
  341. // update ttl
  342. n.UpdateTTL(expireTime)
  343. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  344. s.WatcherHub.notify(e)
  345. s.Stats.Inc(UpdateSuccess)
  346. s.CurrentIndex = nextIndex
  347. return e, nil
  348. }
  349. func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
  350. expireTime time.Time, action string) (*Event, error) {
  351. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  352. if unique { // append unique item under the node path
  353. nodePath += "/" + strconv.FormatUint(nextIndex, 10)
  354. }
  355. nodePath = path.Clean(path.Join("/", nodePath))
  356. // we do not allow the user to change "/"
  357. if nodePath == "/" {
  358. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
  359. }
  360. // Assume expire times that are way in the past are not valid.
  361. // This can occur when the time is serialized to JSON and read back in.
  362. if expireTime.Before(minExpireTime) {
  363. expireTime = Permanent
  364. }
  365. dirName, nodeName := path.Split(nodePath)
  366. // walk through the nodePath, create dirs and get the last directory node
  367. d, err := s.walk(dirName, s.checkDir)
  368. if err != nil {
  369. s.Stats.Inc(SetFail)
  370. err.Index = currIndex
  371. return nil, err
  372. }
  373. e := newEvent(action, nodePath, nextIndex, nextIndex)
  374. eNode := e.Node
  375. n, _ := d.GetChild(nodeName)
  376. // force will try to replace a existing file
  377. if n != nil {
  378. if replace {
  379. if n.IsDir() {
  380. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  381. }
  382. e.PrevNode = n.Repr(false, false)
  383. n.Remove(false, false, nil)
  384. } else {
  385. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
  386. }
  387. }
  388. if !dir { // create file
  389. // copy the value for safety
  390. valueCopy := value
  391. eNode.Value = &valueCopy
  392. n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
  393. } else { // create directory
  394. eNode.Dir = true
  395. n = newDir(s, nodePath, nextIndex, d, "", expireTime)
  396. }
  397. // we are sure d is a directory and does not have the children with name n.Name
  398. d.Add(n)
  399. // node with TTL
  400. if !n.IsPermanent() {
  401. s.ttlKeyHeap.push(n)
  402. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  403. }
  404. s.CurrentIndex = nextIndex
  405. return e, nil
  406. }
  407. // InternalGet gets the node of the given nodePath.
  408. func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
  409. nodePath = path.Clean(path.Join("/", nodePath))
  410. walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
  411. if !parent.IsDir() {
  412. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  413. return nil, err
  414. }
  415. child, ok := parent.Children[name]
  416. if ok {
  417. return child, nil
  418. }
  419. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
  420. }
  421. f, err := s.walk(nodePath, walkFunc)
  422. if err != nil {
  423. return nil, err
  424. }
  425. return f, nil
  426. }
  427. // deleteExpiredKyes will delete all
  428. func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  429. s.worldLock.Lock()
  430. defer s.worldLock.Unlock()
  431. for {
  432. node := s.ttlKeyHeap.top()
  433. if node == nil || node.ExpireTime.After(cutoff) {
  434. break
  435. }
  436. s.CurrentIndex++
  437. e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
  438. e.EtcdIndex = s.CurrentIndex
  439. e.PrevNode = node.Repr(false, false)
  440. callback := func(path string) { // notify function
  441. // notify the watchers with deleted set true
  442. s.WatcherHub.notifyWatchers(e, path, true)
  443. }
  444. s.ttlKeyHeap.pop()
  445. node.Remove(true, true, callback)
  446. s.Stats.Inc(ExpireCount)
  447. s.WatcherHub.notify(e)
  448. }
  449. }
  450. // checkDir will check whether the component is a directory under parent node.
  451. // If it is a directory, this function will return the pointer to that node.
  452. // If it does not exist, this function will create a new directory and return the pointer to that node.
  453. // If it is a file, this function will return error.
  454. func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
  455. node, ok := parent.Children[dirName]
  456. if ok {
  457. if node.IsDir() {
  458. return node, nil
  459. }
  460. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
  461. }
  462. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
  463. parent.Children[dirName] = n
  464. return n, nil
  465. }
  466. // Save saves the static state of the store system.
  467. // It will not be able to save the state of watchers.
  468. // It will not save the parent field of the node. Or there will
  469. // be cyclic dependencies issue for the json package.
  470. func (s *store) Save() ([]byte, error) {
  471. s.worldLock.Lock()
  472. clonedStore := newStore()
  473. clonedStore.CurrentIndex = s.CurrentIndex
  474. clonedStore.Root = s.Root.Clone()
  475. clonedStore.WatcherHub = s.WatcherHub.clone()
  476. clonedStore.Stats = s.Stats.clone()
  477. clonedStore.CurrentVersion = s.CurrentVersion
  478. s.worldLock.Unlock()
  479. b, err := json.Marshal(clonedStore)
  480. if err != nil {
  481. return nil, err
  482. }
  483. return b, nil
  484. }
  485. // Recovery recovers the store system from a static state
  486. // It needs to recover the parent field of the nodes.
  487. // It needs to delete the expired nodes since the saved time and also
  488. // needs to create monitoring go routines.
  489. func (s *store) Recovery(state []byte) error {
  490. s.worldLock.Lock()
  491. defer s.worldLock.Unlock()
  492. err := json.Unmarshal(state, s)
  493. if err != nil {
  494. return err
  495. }
  496. s.ttlKeyHeap = newTtlKeyHeap()
  497. s.Root.recoverAndclean()
  498. return nil
  499. }
  500. func (s *store) JsonStats() []byte {
  501. s.Stats.Watchers = uint64(s.WatcherHub.count)
  502. return s.Stats.toJson()
  503. }
  504. func (s *store) TotalTransactions() uint64 {
  505. return s.Stats.TotalTranscations()
  506. }