store.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "path"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. etcdErr "github.com/coreos/etcd/error"
  12. )
  13. // The default version to set when the store is first initialized.
  14. const defaultVersion = 2
  15. type Store interface {
  16. Version() int
  17. CommandFactory() CommandFactory
  18. Index() uint64
  19. Get(nodePath string, recursive, sorted bool) (*Event, error)
  20. Set(nodePath string, value string, expireTime time.Time) (*Event, error)
  21. Update(nodePath string, newValue string, expireTime time.Time) (*Event, error)
  22. Create(nodePath string, value string, incrementalSuffix bool,
  23. expireTime time.Time) (*Event, error)
  24. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  25. value string, expireTime time.Time) (*Event, error)
  26. Delete(nodePath string, recursive bool) (*Event, error)
  27. Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error)
  28. Save() ([]byte, error)
  29. Recovery(state []byte) error
  30. TotalTransactions() uint64
  31. JsonStats() []byte
  32. DeleteExpiredKeys(cutoff time.Time)
  33. }
  34. type store struct {
  35. Root *Node
  36. WatcherHub *watcherHub
  37. CurrentIndex uint64
  38. Stats *Stats
  39. CurrentVersion int
  40. ttlKeyHeap *ttlKeyHeap // need to recovery manually
  41. worldLock sync.RWMutex // stop the world lock
  42. }
  43. func New() Store {
  44. return newStore()
  45. }
  46. func newStore() *store {
  47. s := new(store)
  48. s.CurrentVersion = defaultVersion
  49. s.Root = newDir(s, "/", s.CurrentIndex, nil, "", Permanent)
  50. s.Stats = newStats()
  51. s.WatcherHub = newWatchHub(1000)
  52. s.ttlKeyHeap = newTtlKeyHeap()
  53. return s
  54. }
  55. // Version retrieves current version of the store.
  56. func (s *store) Version() int {
  57. return s.CurrentVersion
  58. }
  59. // Retrieves current of the store
  60. func (s *store) Index() uint64 {
  61. return s.CurrentIndex
  62. }
  63. // CommandFactory retrieves the command factory for the current version of the store.
  64. func (s *store) CommandFactory() CommandFactory {
  65. return GetCommandFactory(s.Version())
  66. }
  67. // Get function returns a get event.
  68. // If recursive is true, it will return all the content under the node path.
  69. // If sorted is true, it will sort the content by keys.
  70. func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
  71. s.worldLock.RLock()
  72. defer s.worldLock.RUnlock()
  73. nodePath = path.Clean(path.Join("/", nodePath))
  74. n, err := s.internalGet(nodePath)
  75. if err != nil {
  76. s.Stats.Inc(GetFail)
  77. return nil, err
  78. }
  79. e := newEvent(Get, nodePath, n.ModifiedIndex)
  80. if n.IsDir() { // node is a directory
  81. e.Dir = true
  82. children, _ := n.List()
  83. e.KVPairs = make([]KeyValuePair, len(children))
  84. // we do not use the index in the children slice directly
  85. // we need to skip the hidden one
  86. i := 0
  87. for _, child := range children {
  88. if child.IsHidden() { // get will not return hidden nodes
  89. continue
  90. }
  91. e.KVPairs[i] = child.Pair(recursive, sorted)
  92. i++
  93. }
  94. // eliminate hidden nodes
  95. e.KVPairs = e.KVPairs[:i]
  96. if sorted {
  97. sort.Sort(e.KVPairs)
  98. }
  99. } else { // node is a file
  100. e.Value, _ = n.Read()
  101. }
  102. e.Expiration, e.TTL = n.ExpirationAndTTL()
  103. s.Stats.Inc(GetSuccess)
  104. return e, nil
  105. }
  106. // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
  107. // If the node has already existed, create will fail.
  108. // If any node on the path is a file, create will fail.
  109. func (s *store) Create(nodePath string, value string, unique bool, expireTime time.Time) (*Event, error) {
  110. nodePath = path.Clean(path.Join("/", nodePath))
  111. s.worldLock.Lock()
  112. defer s.worldLock.Unlock()
  113. e, err := s.internalCreate(nodePath, value, unique, false, expireTime, Create)
  114. if err == nil {
  115. s.Stats.Inc(CreateSuccess)
  116. } else {
  117. s.Stats.Inc(CreateFail)
  118. }
  119. return e, err
  120. }
  121. // Set function creates or replace the Node at nodePath.
  122. func (s *store) Set(nodePath string, value string, expireTime time.Time) (*Event, error) {
  123. nodePath = path.Clean(path.Join("/", nodePath))
  124. s.worldLock.Lock()
  125. defer s.worldLock.Unlock()
  126. e, err := s.internalCreate(nodePath, value, false, true, expireTime, Set)
  127. if err == nil {
  128. s.Stats.Inc(SetSuccess)
  129. } else {
  130. s.Stats.Inc(SetFail)
  131. }
  132. return e, err
  133. }
  134. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  135. value string, expireTime time.Time) (*Event, error) {
  136. nodePath = path.Clean(path.Join("/", nodePath))
  137. s.worldLock.Lock()
  138. defer s.worldLock.Unlock()
  139. n, err := s.internalGet(nodePath)
  140. if err != nil {
  141. s.Stats.Inc(CompareAndSwapFail)
  142. return nil, err
  143. }
  144. if n.IsDir() { // can only test and set file
  145. s.Stats.Inc(CompareAndSwapFail)
  146. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  147. }
  148. // If both of the prevValue and prevIndex are given, we will test both of them.
  149. // Command will be executed, only if both of the tests are successful.
  150. if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
  151. // update etcd index
  152. s.CurrentIndex++
  153. e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex)
  154. e.PrevValue = n.Value
  155. // if test succeed, write the value
  156. n.Write(value, s.CurrentIndex)
  157. n.UpdateTTL(expireTime)
  158. e.Value = value
  159. e.Expiration, e.TTL = n.ExpirationAndTTL()
  160. s.WatcherHub.notify(e)
  161. s.Stats.Inc(CompareAndSwapSuccess)
  162. return e, nil
  163. }
  164. cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  165. s.Stats.Inc(CompareAndSwapFail)
  166. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  167. }
  168. // Delete function deletes the node at the given path.
  169. // If the node is a directory, recursive must be true to delete it.
  170. func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
  171. nodePath = path.Clean(path.Join("/", nodePath))
  172. s.worldLock.Lock()
  173. defer s.worldLock.Unlock()
  174. nextIndex := s.CurrentIndex + 1
  175. n, err := s.internalGet(nodePath)
  176. if err != nil { // if the node does not exist, return error
  177. s.Stats.Inc(DeleteFail)
  178. return nil, err
  179. }
  180. e := newEvent(Delete, nodePath, nextIndex)
  181. if n.IsDir() {
  182. e.Dir = true
  183. } else {
  184. e.PrevValue = n.Value
  185. }
  186. callback := func(path string) { // notify function
  187. // notify the watchers with delted set true
  188. s.WatcherHub.notifyWatchers(e, path, true)
  189. }
  190. err = n.Remove(recursive, callback)
  191. if err != nil {
  192. s.Stats.Inc(DeleteFail)
  193. return nil, err
  194. }
  195. // update etcd index
  196. s.CurrentIndex++
  197. s.WatcherHub.notify(e)
  198. s.Stats.Inc(DeleteSuccess)
  199. return e, nil
  200. }
  201. func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64) (<-chan *Event, error) {
  202. prefix = path.Clean(path.Join("/", prefix))
  203. nextIndex := s.CurrentIndex + 1
  204. s.worldLock.RLock()
  205. defer s.worldLock.RUnlock()
  206. var c <-chan *Event
  207. var err *etcdErr.Error
  208. if sinceIndex == 0 {
  209. c, err = s.WatcherHub.watch(prefix, recursive, nextIndex)
  210. } else {
  211. c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex)
  212. }
  213. if err != nil {
  214. // watchhub do not know the current Index
  215. // we need to attach the currentIndex here
  216. err.Index = s.CurrentIndex
  217. return nil, err
  218. }
  219. return c, nil
  220. }
  221. // walk function walks all the nodePath and apply the walkFunc on each directory
  222. func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) {
  223. components := strings.Split(nodePath, "/")
  224. curr := s.Root
  225. var err *etcdErr.Error
  226. for i := 1; i < len(components); i++ {
  227. if len(components[i]) == 0 { // ignore empty string
  228. return curr, nil
  229. }
  230. curr, err = walkFunc(curr, components[i])
  231. if err != nil {
  232. return nil, err
  233. }
  234. }
  235. return curr, nil
  236. }
  237. // Update function updates the value/ttl of the node.
  238. // If the node is a file, the value and the ttl can be updated.
  239. // If the node is a directory, only the ttl can be updated.
  240. func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (*Event, error) {
  241. s.worldLock.Lock()
  242. defer s.worldLock.Unlock()
  243. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  244. nodePath = path.Clean(path.Join("/", nodePath))
  245. n, err := s.internalGet(nodePath)
  246. if err != nil { // if the node does not exist, return error
  247. s.Stats.Inc(UpdateFail)
  248. return nil, err
  249. }
  250. e := newEvent(Update, nodePath, nextIndex)
  251. if len(newValue) != 0 {
  252. if n.IsDir() {
  253. // if the node is a directory, we cannot update value
  254. s.Stats.Inc(UpdateFail)
  255. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  256. }
  257. e.PrevValue = n.Value
  258. n.Write(newValue, nextIndex)
  259. e.Value = newValue
  260. } else {
  261. // do not update value
  262. e.Value = n.Value
  263. }
  264. // update ttl
  265. n.UpdateTTL(expireTime)
  266. e.Expiration, e.TTL = n.ExpirationAndTTL()
  267. s.WatcherHub.notify(e)
  268. s.Stats.Inc(UpdateSuccess)
  269. s.CurrentIndex = nextIndex
  270. return e, nil
  271. }
  272. func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
  273. expireTime time.Time, action string) (*Event, error) {
  274. currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
  275. if unique { // append unique item under the node path
  276. nodePath += "/" + strconv.FormatUint(nextIndex, 10)
  277. }
  278. nodePath = path.Clean(path.Join("/", nodePath))
  279. dir, newNodeName := path.Split(nodePath)
  280. // walk through the nodePath, create dirs and get the last directory node
  281. d, err := s.walk(dir, s.checkDir)
  282. if err != nil {
  283. s.Stats.Inc(SetFail)
  284. err.Index = currIndex
  285. return nil, err
  286. }
  287. e := newEvent(action, nodePath, nextIndex)
  288. n, _ := d.GetChild(newNodeName)
  289. // force will try to replace a existing file
  290. if n != nil {
  291. if replace {
  292. if n.IsDir() {
  293. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  294. }
  295. e.PrevValue, _ = n.Read()
  296. n.Remove(false, nil)
  297. } else {
  298. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
  299. }
  300. }
  301. if len(value) != 0 { // create file
  302. e.Value = value
  303. n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
  304. } else { // create directory
  305. e.Dir = true
  306. n = newDir(s, nodePath, nextIndex, d, "", expireTime)
  307. }
  308. // we are sure d is a directory and does not have the children with name n.Name
  309. d.Add(n)
  310. // Node with TTL
  311. if !n.IsPermanent() {
  312. s.ttlKeyHeap.push(n)
  313. e.Expiration, e.TTL = n.ExpirationAndTTL()
  314. }
  315. s.CurrentIndex = nextIndex
  316. s.WatcherHub.notify(e)
  317. return e, nil
  318. }
  319. // InternalGet function get the node of the given nodePath.
  320. func (s *store) internalGet(nodePath string) (*Node, *etcdErr.Error) {
  321. nodePath = path.Clean(path.Join("/", nodePath))
  322. walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
  323. if !parent.IsDir() {
  324. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  325. return nil, err
  326. }
  327. child, ok := parent.Children[name]
  328. if ok {
  329. return child, nil
  330. }
  331. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
  332. }
  333. f, err := s.walk(nodePath, walkFunc)
  334. if err != nil {
  335. return nil, err
  336. }
  337. return f, nil
  338. }
  339. // deleteExpiredKyes will delete all
  340. func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  341. s.worldLock.Lock()
  342. defer s.worldLock.Unlock()
  343. for {
  344. node := s.ttlKeyHeap.top()
  345. if node == nil || node.ExpireTime.After(cutoff) {
  346. break
  347. }
  348. s.ttlKeyHeap.pop()
  349. node.Remove(true, nil)
  350. s.CurrentIndex++
  351. s.Stats.Inc(ExpireCount)
  352. s.WatcherHub.notify(newEvent(Expire, node.Path, s.CurrentIndex))
  353. }
  354. }
  355. // checkDir function will check whether the component is a directory under parent node.
  356. // If it is a directory, this function will return the pointer to that node.
  357. // If it does not exist, this function will create a new directory and return the pointer to that node.
  358. // If it is a file, this function will return error.
  359. func (s *store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
  360. node, ok := parent.Children[dirName]
  361. if ok {
  362. if node.IsDir() {
  363. return node, nil
  364. }
  365. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
  366. }
  367. n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, parent.ACL, Permanent)
  368. parent.Children[dirName] = n
  369. return n, nil
  370. }
  371. // Save function saves the static state of the store system.
  372. // Save function will not be able to save the state of watchers.
  373. // Save function will not save the parent field of the node. Or there will
  374. // be cyclic dependencies issue for the json package.
  375. func (s *store) Save() ([]byte, error) {
  376. s.worldLock.Lock()
  377. clonedStore := newStore()
  378. clonedStore.CurrentIndex = s.CurrentIndex
  379. clonedStore.Root = s.Root.Clone()
  380. clonedStore.WatcherHub = s.WatcherHub.clone()
  381. clonedStore.Stats = s.Stats.clone()
  382. clonedStore.CurrentVersion = s.CurrentVersion
  383. s.worldLock.Unlock()
  384. b, err := json.Marshal(clonedStore)
  385. if err != nil {
  386. return nil, err
  387. }
  388. return b, nil
  389. }
  390. // recovery function recovery the store system from a static state.
  391. // It needs to recovery the parent field of the nodes.
  392. // It needs to delete the expired nodes since the saved time and also
  393. // need to create monitor go routines.
  394. func (s *store) Recovery(state []byte) error {
  395. s.worldLock.Lock()
  396. defer s.worldLock.Unlock()
  397. err := json.Unmarshal(state, s)
  398. if err != nil {
  399. return err
  400. }
  401. s.ttlKeyHeap = newTtlKeyHeap()
  402. s.Root.recoverAndclean()
  403. return nil
  404. }
  405. func (s *store) JsonStats() []byte {
  406. s.Stats.Watchers = uint64(s.WatcherHub.count)
  407. return s.Stats.toJson()
  408. }
  409. func (s *store) TotalTransactions() uint64 {
  410. return s.Stats.TotalTranscations()
  411. }