store.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "path"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "time"
  22. etcdErr "github.com/coreos/etcd/error"
  23. )
  24. // The default version to set when the store is first initialized.
  25. const defaultVersion = 2
  26. var minExpireTime time.Time
  27. func init() {
  28. minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
  29. }
  30. type Store interface {
  31. Version() int
  32. CommandFactory() CommandFactory
  33. Index() uint64
  34. Get(nodePath string, recursive, sorted bool) (*Event, error)
  35. Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error)
  36. Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
  37. Create(nodePath string, dir bool, value string, unique bool,
  38. expireTime time.Time) (*Event, error)
  39. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  40. value string, expireTime time.Time) (*Event, error)
  41. Delete(nodePath string, recursive, dir bool) (*Event, error)
  42. CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
  43. Watch(prefix string, recursive, stream bool, sinceIndex uint64) (*Watcher, error)
  44. Save() ([]byte, error)
  45. Recovery(state []byte) error
  46. TotalTransactions() uint64
  47. JsonStats() []byte
  48. DeleteExpiredKeys(cutoff time.Time)
  49. }
  50. type store struct {
  51. Root *node
  52. WatcherHub *watcherHub
  53. CurrentIndex uint64
  54. Stats *Stats
  55. CurrentVersion int
  56. ttlKeyHeap *ttlKeyHeap // need to recovery manually
  57. worldLock sync.RWMutex // stop the world lock
  58. }
  59. func New() Store {
  60. return newStore()
  61. }
  62. func newStore() *store {
  63. s := new(store)
  64. s.CurrentVersion = defaultVersion
  65. s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
  66. s.Stats = newStats()
  67. s.WatcherHub = newWatchHub(1000)
  68. s.ttlKeyHeap = newTtlKeyHeap()
  69. return s
  70. }
  71. // Version retrieves current version of the store.
  72. func (s *store) Version() int {
  73. return s.CurrentVersion
  74. }
  75. // Retrieves current of the store
  76. func (s *store) Index() uint64 {
  77. return s.CurrentIndex
  78. }
  79. // CommandFactory retrieves the command factory for the current version of the store.
  80. func (s *store) CommandFactory() CommandFactory {
  81. return GetCommandFactory(s.Version())
  82. }
  83. // Get returns a get event.
  84. // If recursive is true, it will return all the content under the node path.
  85. // If sorted is true, it will sort the content by keys.
  86. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
  87. s.worldLock.RLock()
  88. defer s.worldLock.RUnlock()
  89. nodePath = path.Clean(path.Join("/", nodePath))
  90. n, err := s.internalGet(nodePath)
  91. if err != nil {
  92. s.Stats.Inc(GetFail)
  93. return nil, err
  94. }
  95. e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  96. e.Node.loadInternalNode(n, recursive, sorted)
  97. s.Stats.Inc(GetSuccess)
  98. return e, nil
  99. }
  100. // Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
  101. // If the node has already existed, create will fail.
  102. // If any node on the path is a file, create will fail.
  103. func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireTime time.Time) (*Event, error) {
  104. s.worldLock.Lock()
  105. defer s.worldLock.Unlock()
  106. e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
  107. if err == nil {
  108. s.Stats.Inc(CreateSuccess)
  109. } else {
  110. s.Stats.Inc(CreateFail)
  111. }
  112. return e, err
  113. }
  114. // Set creates or replace the node at nodePath.
  115. func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Time) (*Event, error) {
  116. var err error
  117. s.worldLock.Lock()
  118. defer s.worldLock.Unlock()
  119. defer func() {
  120. if err == nil {
  121. s.Stats.Inc(SetSuccess)
  122. } else {
  123. s.Stats.Inc(SetFail)
  124. }
  125. }()
  126. // Get prevNode value
  127. n, getErr := s.internalGet(nodePath)
  128. if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
  129. err = getErr
  130. return nil, err
  131. }
  132. // Set new value
  133. e, err := s.internalCreate(nodePath, dir, value, false, true, expireTime, Set)
  134. if err != nil {
  135. return nil, err
  136. }
  137. // Put prevNode into event
  138. if getErr == nil {
  139. prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  140. prev.Node.loadInternalNode(n, false, false)
  141. e.PrevNode = prev.Node
  142. }
  143. return e, nil
  144. }
  145. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  146. value string, expireTime time.Time) (*Event, error) {
  147. nodePath = path.Clean(path.Join("/", nodePath))
  148. // we do not allow the user to change "/"
  149. if nodePath == "/" {
  150. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  151. }
  152. s.worldLock.Lock()
  153. defer s.worldLock.Unlock()
  154. n, err := s.internalGet(nodePath)
  155. if err != nil {
  156. s.Stats.Inc(CompareAndSwapFail)
  157. return nil, err
  158. }
  159. if n.IsDir() { // can only compare and swap file
  160. s.Stats.Inc(CompareAndSwapFail)
  161. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  162. }
  163. // If both of the prevValue and prevIndex are given, we will test both of them.
  164. // Command will be executed, only if both of the tests are successful.
  165. if !n.Compare(prevValue, prevIndex) {
  166. cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  167. s.Stats.Inc(CompareAndSwapFail)
  168. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  169. }
  170. // update etcd index
  171. s.CurrentIndex++
  172. e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
  173. e.PrevNode = n.Repr(false, false)
  174. eNode := e.Node
  175. // if test succeed, write the value
  176. n.Write(value, s.CurrentIndex)
  177. n.UpdateTTL(expireTime)
  178. eNode.Value = value
  179. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  180. s.WatcherHub.notify(e)
  181. s.Stats.Inc(CompareAndSwapSuccess)
  182. return e, nil
  183. }
  184. // Delete deletes the node at the given path.
  185. // If the node is a directory, recursive must be true to delete it.
  186. func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
  187. nodePath = path.Clean(path.Join("/", nodePath))
  188. // we do not allow the user to change "/"
  189. if nodePath == "/" {
  190. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  191. }
  192. s.worldLock.Lock()
  193. defer s.worldLock.Unlock()
  194. // recursive implies dir
  195. if recursive == true {
  196. dir = true
  197. }
  198. n, err := s.internalGet(nodePath)
  199. if err != nil { // if the node does not exist, return error
  200. s.Stats.Inc(DeleteFail)
  201. return nil, err
  202. }
  203. nextIndex := s.CurrentIndex + 1
  204. e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
  205. e.PrevNode = n.Repr(false, false)
  206. eNode := e.Node
  207. if n.IsDir() {
  208. eNode.Dir = true
  209. }
  210. callback := func(path string) { // notify function
  211. // notify the watchers with deleted set true
  212. s.WatcherHub.notifyWatchers(e, path, true)
  213. }
  214. err = n.Remove(dir, recursive, callback)
  215. if err != nil {
  216. s.Stats.Inc(DeleteFail)
  217. return nil, err
  218. }
  219. // update etcd index
  220. s.CurrentIndex++
  221. s.WatcherHub.notify(e)
  222. s.Stats.Inc(DeleteSuccess)
  223. return e, nil
  224. }
  225. func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
  226. nodePath = path.Clean(path.Join("/", nodePath))
  227. s.worldLock.Lock()
  228. defer s.worldLock.Unlock()
  229. n, err := s.internalGet(nodePath)
  230. if err != nil { // if the node does not exist, return error
  231. s.Stats.Inc(CompareAndDeleteFail)
  232. return nil, err
  233. }
  234. if n.IsDir() { // can only compare and delete file
  235. s.Stats.Inc(CompareAndSwapFail)
  236. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  237. }
  238. // If both of the prevValue and prevIndex are given, we will test both of them.
  239. // Command will be executed, only if both of the tests are successful.
  240. if !n.Compare(prevValue, prevIndex) {
  241. cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  242. s.Stats.Inc(CompareAndDeleteFail)
  243. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  244. }
  245. // update etcd index
  246. s.CurrentIndex++
  247. e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
  248. e.PrevNode = n.Repr(false, false)
  249. callback := func(path string) { // notify function
  250. // notify the watchers with deleted set true
  251. s.WatcherHub.notifyWatchers(e, path, true)
  252. }
  253. // delete a key-value pair, no error should happen
  254. n.Remove(false, false, callback)
  255. s.WatcherHub.notify(e)
  256. s.Stats.Inc(CompareAndDeleteSuccess)
  257. return e, nil
  258. }
  259. func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (*Watcher, error) {
  260. key = path.Clean(path.Join("/", key))
  261. nextIndex := s.CurrentIndex + 1
  262. s.worldLock.RLock()
  263. defer s.worldLock.RUnlock()
  264. var w *Watcher
  265. var err *etcdErr.Error
  266. if sinceIndex == 0 {
  267. w, err = s.WatcherHub.watch(key, recursive, stream, nextIndex)
  268. } else {
  269. w, err = s.WatcherHub.watch(key, recursive, stream, sinceIndex)
  270. }
  271. if err != nil {
  272. // watchhub do not know the current Index
  273. // we need to attach the currentIndex here
  274. err.Index = s.CurrentIndex
  275. return nil, err
  276. }
  277. return w, nil
  278. }
  279. // walk walks all the nodePath and apply the walkFunc on each directory
  280. func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
  281. components := strings.Split(nodePath, "/")
  282. curr := s.Root
  283. var err *etcdErr.Error
  284. for i := 1; i < len(components); i++ {
  285. if len(components[i]) == 0 { // ignore empty string
  286. return curr, nil
  287. }
  288. curr, err = walkFunc(curr, components[i])
  289. if err != nil {
  290. return nil, err
  291. }
  292. }
  293. return curr, nil
  294. }
  295. // Update updates the value/ttl of the node.
  296. // If the node is a file, the value and the ttl can be updated.
  297. // If the node is a directory, only the ttl can be updated.
  298. func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
  299. nodePath = path.Clean(path.Join("/", nodePath))
  300. // we do not allow the user to change "/"
  301. if nodePath == "/" {
  302. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  303. }
  304. s.worldLock.Lock()
  305. defer s.worldLock.Unlock()
  306. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  307. n, err := s.internalGet(nodePath)
  308. if err != nil { // if the node does not exist, return error
  309. s.Stats.Inc(UpdateFail)
  310. return nil, err
  311. }
  312. e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
  313. e.PrevNode = n.Repr(false, false)
  314. eNode := e.Node
  315. if n.IsDir() && len(newValue) != 0 {
  316. // if the node is a directory, we cannot update value to non-empty
  317. s.Stats.Inc(UpdateFail)
  318. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  319. }
  320. n.Write(newValue, nextIndex)
  321. eNode.Value = newValue
  322. // update ttl
  323. n.UpdateTTL(expireTime)
  324. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  325. s.WatcherHub.notify(e)
  326. s.Stats.Inc(UpdateSuccess)
  327. s.CurrentIndex = nextIndex
  328. return e, nil
  329. }
  330. func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
  331. expireTime time.Time, action string) (*Event, error) {
  332. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  333. if unique { // append unique item under the node path
  334. nodePath += "/" + strconv.FormatUint(nextIndex, 10)
  335. }
  336. nodePath = path.Clean(path.Join("/", nodePath))
  337. // we do not allow the user to change "/"
  338. if nodePath == "/" {
  339. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
  340. }
  341. // Assume expire times that are way in the past are not valid.
  342. // This can occur when the time is serialized to JSON and read back in.
  343. if expireTime.Before(minExpireTime) {
  344. expireTime = Permanent
  345. }
  346. dirName, nodeName := path.Split(nodePath)
  347. // walk through the nodePath, create dirs and get the last directory node
  348. d, err := s.walk(dirName, s.checkDir)
  349. if err != nil {
  350. s.Stats.Inc(SetFail)
  351. err.Index = currIndex
  352. return nil, err
  353. }
  354. e := newEvent(action, nodePath, nextIndex, nextIndex)
  355. eNode := e.Node
  356. n, _ := d.GetChild(nodeName)
  357. // force will try to replace a existing file
  358. if n != nil {
  359. if replace {
  360. if n.IsDir() {
  361. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  362. }
  363. e.PrevNode = n.Repr(false, false)
  364. n.Remove(false, false, nil)
  365. } else {
  366. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
  367. }
  368. }
  369. if !dir { // create file
  370. eNode.Value = value
  371. n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
  372. } else { // create directory
  373. eNode.Dir = true
  374. n = newDir(s, nodePath, nextIndex, d, "", expireTime)
  375. }
  376. // we are sure d is a directory and does not have the children with name n.Name
  377. d.Add(n)
  378. // node with TTL
  379. if !n.IsPermanent() {
  380. s.ttlKeyHeap.push(n)
  381. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  382. }
  383. s.CurrentIndex = nextIndex
  384. s.WatcherHub.notify(e)
  385. return e, nil
  386. }
  387. // InternalGet gets the node of the given nodePath.
  388. func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
  389. nodePath = path.Clean(path.Join("/", nodePath))
  390. walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
  391. if !parent.IsDir() {
  392. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  393. return nil, err
  394. }
  395. child, ok := parent.Children[name]
  396. if ok {
  397. return child, nil
  398. }
  399. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
  400. }
  401. f, err := s.walk(nodePath, walkFunc)
  402. if err != nil {
  403. return nil, err
  404. }
  405. return f, nil
  406. }
  407. // deleteExpiredKyes will delete all
  408. func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  409. s.worldLock.Lock()
  410. defer s.worldLock.Unlock()
  411. for {
  412. node := s.ttlKeyHeap.top()
  413. if node == nil || node.ExpireTime.After(cutoff) {
  414. break
  415. }
  416. s.CurrentIndex++
  417. e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
  418. e.PrevNode = node.Repr(false, false)
  419. callback := func(path string) { // notify function
  420. // notify the watchers with deleted set true
  421. s.WatcherHub.notifyWatchers(e, path, true)
  422. }
  423. s.ttlKeyHeap.pop()
  424. node.Remove(true, true, callback)
  425. s.Stats.Inc(ExpireCount)
  426. s.WatcherHub.notify(e)
  427. }
  428. }
  429. // checkDir will check whether the component is a directory under parent node.
  430. // If it is a directory, this function will return the pointer to that node.
  431. // If it does not exist, this function will create a new directory and return the pointer to that node.
  432. // If it is a file, this function will return error.
  433. func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
  434. node, ok := parent.Children[dirName]
  435. if ok {
  436. if node.IsDir() {
  437. return node, nil
  438. }
  439. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
  440. }
  441. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
  442. parent.Children[dirName] = n
  443. return n, nil
  444. }
  445. // Save saves the static state of the store system.
  446. // It will not be able to save the state of watchers.
  447. // It will not save the parent field of the node. Or there will
  448. // be cyclic dependencies issue for the json package.
  449. func (s *store) Save() ([]byte, error) {
  450. s.worldLock.Lock()
  451. clonedStore := newStore()
  452. clonedStore.CurrentIndex = s.CurrentIndex
  453. clonedStore.Root = s.Root.Clone()
  454. clonedStore.WatcherHub = s.WatcherHub.clone()
  455. clonedStore.Stats = s.Stats.clone()
  456. clonedStore.CurrentVersion = s.CurrentVersion
  457. s.worldLock.Unlock()
  458. b, err := json.Marshal(clonedStore)
  459. if err != nil {
  460. return nil, err
  461. }
  462. return b, nil
  463. }
  464. // Recovery recovers the store system from a static state
  465. // It needs to recover the parent field of the nodes.
  466. // It needs to delete the expired nodes since the saved time and also
  467. // needs to create monitoring go routines.
  468. func (s *store) Recovery(state []byte) error {
  469. s.worldLock.Lock()
  470. defer s.worldLock.Unlock()
  471. err := json.Unmarshal(state, s)
  472. if err != nil {
  473. return err
  474. }
  475. s.ttlKeyHeap = newTtlKeyHeap()
  476. s.Root.recoverAndclean()
  477. return nil
  478. }
  479. func (s *store) JsonStats() []byte {
  480. s.Stats.Watchers = uint64(s.WatcherHub.count)
  481. return s.Stats.toJson()
  482. }
  483. func (s *store) TotalTransactions() uint64 {
  484. return s.Stats.TotalTranscations()
  485. }