store.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "path"
  18. "sort"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. etcdErr "github.com/coreos/etcd/error"
  24. )
  25. // The default version to set when the store is first initialized.
  26. const defaultVersion = 2
  27. var minExpireTime time.Time
  28. func init() {
  29. minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
  30. }
  31. type Store interface {
  32. Version() int
  33. CommandFactory() CommandFactory
  34. Index() uint64
  35. Get(nodePath string, recursive, sorted bool) (*Event, error)
  36. Set(nodePath string, value string, expireTime time.Time) (*Event, error)
  37. Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
  38. Create(nodePath string, value string, incrementalSuffix bool,
  39. expireTime time.Time) (*Event, error)
  40. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  41. value string, expireTime time.Time) (*Event, error)
  42. Delete(nodePath string, recursive bool) (*Event, error)
  43. Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
  44. Save() ([]byte, error)
  45. Recovery(state []byte) error
  46. TotalTransactions() uint64
  47. JsonStats() []byte
  48. DeleteExpiredKeys(cutoff time.Time)
  49. }
  50. type store struct {
  51. Root *node
  52. WatcherHub *watcherHub
  53. CurrentIndex uint64
  54. Stats *Stats
  55. CurrentVersion int
  56. ttlKeyHeap *ttlKeyHeap // need to recovery manually
  57. worldLock sync.RWMutex // stop the world lock
  58. }
  59. func New() Store {
  60. return newStore()
  61. }
  62. func newStore() *store {
  63. s := new(store)
  64. s.CurrentVersion = defaultVersion
  65. s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
  66. s.Stats = newStats()
  67. s.WatcherHub = newWatchHub(1000)
  68. s.ttlKeyHeap = newTtlKeyHeap()
  69. return s
  70. }
  71. // Version retrieves current version of the store.
  72. func (s *store) Version() int {
  73. return s.CurrentVersion
  74. }
  75. // Retrieves current of the store
  76. func (s *store) Index() uint64 {
  77. return s.CurrentIndex
  78. }
  79. // CommandFactory retrieves the command factory for the current version of the store.
  80. func (s *store) CommandFactory() CommandFactory {
  81. return GetCommandFactory(s.Version())
  82. }
  83. // Get function returns a get event.
  84. // If recursive is true, it will return all the content under the node path.
  85. // If sorted is true, it will sort the content by keys.
  86. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
  87. s.worldLock.RLock()
  88. defer s.worldLock.RUnlock()
  89. nodePath = path.Clean(path.Join("/", nodePath))
  90. n, err := s.internalGet(nodePath)
  91. if err != nil {
  92. s.Stats.Inc(GetFail)
  93. return nil, err
  94. }
  95. e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  96. eNode := e.Node
  97. if n.IsDir() { // node is a directory
  98. eNode.Dir = true
  99. children, _ := n.List()
  100. eNode.Nodes = make(NodeExterns, len(children))
  101. // we do not use the index in the children slice directly
  102. // we need to skip the hidden one
  103. i := 0
  104. for _, child := range children {
  105. if child.IsHidden() { // get will not return hidden nodes
  106. continue
  107. }
  108. eNode.Nodes[i] = child.Repr(recursive, sorted)
  109. i++
  110. }
  111. // eliminate hidden nodes
  112. eNode.Nodes = eNode.Nodes[:i]
  113. if sorted {
  114. sort.Sort(eNode.Nodes)
  115. }
  116. } else { // node is a file
  117. eNode.Value, _ = n.Read()
  118. }
  119. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  120. s.Stats.Inc(GetSuccess)
  121. return e, nil
  122. }
  123. // Create function creates the node at nodePath. Create will help to create intermediate directories with no ttl.
  124. // If the node has already existed, create will fail.
  125. // If any node on the path is a file, create will fail.
  126. func (s *store) Create(nodePath string, value string, unique bool, expireTime time.Time) (*Event, error) {
  127. s.worldLock.Lock()
  128. defer s.worldLock.Unlock()
  129. e, err := s.internalCreate(nodePath, value, unique, false, expireTime, Create)
  130. if err == nil {
  131. s.Stats.Inc(CreateSuccess)
  132. } else {
  133. s.Stats.Inc(CreateFail)
  134. }
  135. return e, err
  136. }
  137. // Set function creates or replace the node at nodePath.
  138. func (s *store) Set(nodePath string, value string, expireTime time.Time) (*Event, error) {
  139. s.worldLock.Lock()
  140. defer s.worldLock.Unlock()
  141. e, err := s.internalCreate(nodePath, value, false, true, expireTime, Set)
  142. if err == nil {
  143. s.Stats.Inc(SetSuccess)
  144. } else {
  145. s.Stats.Inc(SetFail)
  146. }
  147. return e, err
  148. }
  149. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  150. value string, expireTime time.Time) (*Event, error) {
  151. nodePath = path.Clean(path.Join("/", nodePath))
  152. // we do not allow the user to change "/"
  153. if nodePath == "/" {
  154. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  155. }
  156. s.worldLock.Lock()
  157. defer s.worldLock.Unlock()
  158. n, err := s.internalGet(nodePath)
  159. if err != nil {
  160. s.Stats.Inc(CompareAndSwapFail)
  161. return nil, err
  162. }
  163. if n.IsDir() { // can only test and set file
  164. s.Stats.Inc(CompareAndSwapFail)
  165. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  166. }
  167. // If both of the prevValue and prevIndex are given, we will test both of them.
  168. // Command will be executed, only if both of the tests are successful.
  169. if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
  170. // update etcd index
  171. s.CurrentIndex++
  172. e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
  173. eNode := e.Node
  174. eNode.PrevValue = n.Value
  175. // if test succeed, write the value
  176. n.Write(value, s.CurrentIndex)
  177. n.UpdateTTL(expireTime)
  178. eNode.Value = value
  179. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  180. s.WatcherHub.notify(e)
  181. s.Stats.Inc(CompareAndSwapSuccess)
  182. return e, nil
  183. }
  184. cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  185. s.Stats.Inc(CompareAndSwapFail)
  186. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  187. }
  188. // Delete function deletes the node at the given path.
  189. // If the node is a directory, recursive must be true to delete it.
  190. func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
  191. nodePath = path.Clean(path.Join("/", nodePath))
  192. // we do not allow the user to change "/"
  193. if nodePath == "/" {
  194. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  195. }
  196. s.worldLock.Lock()
  197. defer s.worldLock.Unlock()
  198. nextIndex := s.CurrentIndex + 1
  199. n, err := s.internalGet(nodePath)
  200. if err != nil { // if the node does not exist, return error
  201. s.Stats.Inc(DeleteFail)
  202. return nil, err
  203. }
  204. e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
  205. eNode := e.Node
  206. if n.IsDir() {
  207. eNode.Dir = true
  208. } else {
  209. eNode.PrevValue = n.Value
  210. }
  211. callback := func(path string) { // notify function
  212. // notify the watchers with delted set true
  213. s.WatcherHub.notifyWatchers(e, path, true)
  214. }
  215. err = n.Remove(recursive, callback)
  216. if err != nil {
  217. s.Stats.Inc(DeleteFail)
  218. return nil, err
  219. }
  220. // update etcd index
  221. s.CurrentIndex++
  222. s.WatcherHub.notify(e)
  223. s.Stats.Inc(DeleteSuccess)
  224. return e, nil
  225. }
  226. func (s *store) Watch(key string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
  227. key = path.Clean(path.Join("/", key))
  228. nextIndex := s.CurrentIndex + 1
  229. s.worldLock.RLock()
  230. defer s.worldLock.RUnlock()
  231. var c <-chan *Event
  232. var err *etcdErr.Error
  233. if sinceIndex == 0 {
  234. c, err = s.WatcherHub.watch(key, recursive, nextIndex)
  235. } else {
  236. c, err = s.WatcherHub.watch(key, recursive, sinceIndex)
  237. }
  238. if err != nil {
  239. // watchhub do not know the current Index
  240. // we need to attach the currentIndex here
  241. err.Index = s.CurrentIndex
  242. return nil, err
  243. }
  244. return c, nil
  245. }
  246. // walk function walks all the nodePath and apply the walkFunc on each directory
  247. func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
  248. components := strings.Split(nodePath, "/")
  249. curr := s.Root
  250. var err *etcdErr.Error
  251. for i := 1; i < len(components); i++ {
  252. if len(components[i]) == 0 { // ignore empty string
  253. return curr, nil
  254. }
  255. curr, err = walkFunc(curr, components[i])
  256. if err != nil {
  257. return nil, err
  258. }
  259. }
  260. return curr, nil
  261. }
  262. // Update function updates the value/ttl of the node.
  263. // If the node is a file, the value and the ttl can be updated.
  264. // If the node is a directory, only the ttl can be updated.
  265. func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
  266. nodePath = path.Clean(path.Join("/", nodePath))
  267. // we do not allow the user to change "/"
  268. if nodePath == "/" {
  269. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  270. }
  271. s.worldLock.Lock()
  272. defer s.worldLock.Unlock()
  273. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  274. n, err := s.internalGet(nodePath)
  275. if err != nil { // if the node does not exist, return error
  276. s.Stats.Inc(UpdateFail)
  277. return nil, err
  278. }
  279. e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
  280. eNode := e.Node
  281. if len(newValue) != 0 {
  282. if n.IsDir() {
  283. // if the node is a directory, we cannot update value
  284. s.Stats.Inc(UpdateFail)
  285. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  286. }
  287. eNode.PrevValue = n.Value
  288. n.Write(newValue, nextIndex)
  289. eNode.Value = newValue
  290. } else {
  291. // do not update value
  292. eNode.Value = n.Value
  293. }
  294. // update ttl
  295. n.UpdateTTL(expireTime)
  296. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  297. s.WatcherHub.notify(e)
  298. s.Stats.Inc(UpdateSuccess)
  299. s.CurrentIndex = nextIndex
  300. return e, nil
  301. }
  302. func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
  303. expireTime time.Time, action string) (*Event, error) {
  304. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  305. if unique { // append unique item under the node path
  306. nodePath += "/" + strconv.FormatUint(nextIndex, 10)
  307. }
  308. nodePath = path.Clean(path.Join("/", nodePath))
  309. // we do not allow the user to change "/"
  310. if nodePath == "/" {
  311. return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
  312. }
  313. // Assume expire times that are way in the past are not valid.
  314. // This can occur when the time is serialized to JSON and read back in.
  315. if expireTime.Before(minExpireTime) {
  316. expireTime = Permanent
  317. }
  318. dir, newNodeName := path.Split(nodePath)
  319. // walk through the nodePath, create dirs and get the last directory node
  320. d, err := s.walk(dir, s.checkDir)
  321. if err != nil {
  322. s.Stats.Inc(SetFail)
  323. err.Index = currIndex
  324. return nil, err
  325. }
  326. e := newEvent(action, nodePath, nextIndex, nextIndex)
  327. eNode := e.Node
  328. n, _ := d.GetChild(newNodeName)
  329. // force will try to replace a existing file
  330. if n != nil {
  331. if replace {
  332. if n.IsDir() {
  333. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  334. }
  335. eNode.PrevValue, _ = n.Read()
  336. n.Remove(false, nil)
  337. } else {
  338. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
  339. }
  340. }
  341. if len(value) != 0 { // create file
  342. eNode.Value = value
  343. n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
  344. } else { // create directory
  345. eNode.Dir = true
  346. n = newDir(s, nodePath, nextIndex, d, "", expireTime)
  347. }
  348. // we are sure d is a directory and does not have the children with name n.Name
  349. d.Add(n)
  350. // node with TTL
  351. if !n.IsPermanent() {
  352. s.ttlKeyHeap.push(n)
  353. eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
  354. }
  355. s.CurrentIndex = nextIndex
  356. s.WatcherHub.notify(e)
  357. return e, nil
  358. }
  359. // InternalGet function get the node of the given nodePath.
  360. func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
  361. nodePath = path.Clean(path.Join("/", nodePath))
  362. walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
  363. if !parent.IsDir() {
  364. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  365. return nil, err
  366. }
  367. child, ok := parent.Children[name]
  368. if ok {
  369. return child, nil
  370. }
  371. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
  372. }
  373. f, err := s.walk(nodePath, walkFunc)
  374. if err != nil {
  375. return nil, err
  376. }
  377. return f, nil
  378. }
  379. // deleteExpiredKyes will delete all
  380. func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  381. s.worldLock.Lock()
  382. defer s.worldLock.Unlock()
  383. for {
  384. node := s.ttlKeyHeap.top()
  385. if node == nil || node.ExpireTime.After(cutoff) {
  386. break
  387. }
  388. s.ttlKeyHeap.pop()
  389. node.Remove(true, nil)
  390. s.CurrentIndex++
  391. s.Stats.Inc(ExpireCount)
  392. s.WatcherHub.notify(newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex))
  393. }
  394. }
  395. // checkDir function will check whether the component is a directory under parent node.
  396. // If it is a directory, this function will return the pointer to that node.
  397. // If it does not exist, this function will create a new directory and return the pointer to that node.
  398. // If it is a file, this function will return error.
  399. func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
  400. node, ok := parent.Children[dirName]
  401. if ok {
  402. if node.IsDir() {
  403. return node, nil
  404. }
  405. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  406. }
  407. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
  408. parent.Children[dirName] = n
  409. return n, nil
  410. }
  411. // Save function saves the static state of the store system.
  412. // Save function will not be able to save the state of watchers.
  413. // Save function will not save the parent field of the node. Or there will
  414. // be cyclic dependencies issue for the json package.
  415. func (s *store) Save() ([]byte, error) {
  416. s.worldLock.Lock()
  417. clonedStore := newStore()
  418. clonedStore.CurrentIndex = s.CurrentIndex
  419. clonedStore.Root = s.Root.Clone()
  420. clonedStore.WatcherHub = s.WatcherHub.clone()
  421. clonedStore.Stats = s.Stats.clone()
  422. clonedStore.CurrentVersion = s.CurrentVersion
  423. s.worldLock.Unlock()
  424. b, err := json.Marshal(clonedStore)
  425. if err != nil {
  426. return nil, err
  427. }
  428. return b, nil
  429. }
  430. // recovery function recovery the store system from a static state.
  431. // It needs to recovery the parent field of the nodes.
  432. // It needs to delete the expired nodes since the saved time and also
  433. // need to create monitor go routines.
  434. func (s *store) Recovery(state []byte) error {
  435. s.worldLock.Lock()
  436. defer s.worldLock.Unlock()
  437. err := json.Unmarshal(state, s)
  438. if err != nil {
  439. return err
  440. }
  441. s.ttlKeyHeap = newTtlKeyHeap()
  442. s.Root.recoverAndclean()
  443. return nil
  444. }
  445. func (s *store) JsonStats() []byte {
  446. s.Stats.Watchers = uint64(s.WatcherHub.count)
  447. return s.Stats.toJson()
  448. }
  449. func (s *store) TotalTransactions() uint64 {
  450. return s.Stats.TotalTranscations()
  451. }