store.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package store
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "path"
  18. "sort"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. etcdErr "github.com/coreos/etcd/error"
  24. )
  25. // The default version to set when the store is first initialized.
  26. const defaultVersion = 2
  27. type Store interface {
  28. Version() int
  29. CommandFactory() CommandFactory
  30. Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error)
  31. Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
  32. Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error)
  33. Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time,
  34. index uint64, term uint64) (*Event, error)
  35. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  36. value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
  37. Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error)
  38. Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error)
  39. Save() ([]byte, error)
  40. Recovery(state []byte) error
  41. TotalTransactions() uint64
  42. JsonStats() []byte
  43. }
  44. type store struct {
  45. Root *Node
  46. WatcherHub *watcherHub
  47. Index uint64
  48. Term uint64
  49. Stats *Stats
  50. CurrentVersion int
  51. worldLock sync.RWMutex // stop the world lock
  52. }
  53. func New() Store {
  54. return newStore()
  55. }
  56. func newStore() *store {
  57. s := new(store)
  58. s.CurrentVersion = defaultVersion
  59. s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent)
  60. s.Stats = newStats()
  61. s.WatcherHub = newWatchHub(1000)
  62. return s
  63. }
  64. // Version retrieves current version of the store.
  65. func (s *store) Version() int {
  66. return s.CurrentVersion
  67. }
  68. // CommandFactory retrieves the command factory for the current version of the store.
  69. func (s *store) CommandFactory() CommandFactory {
  70. return GetCommandFactory(s.Version())
  71. }
  72. // Get function returns a get event.
  73. // If recursive is true, it will return all the content under the node path.
  74. // If sorted is true, it will sort the content by keys.
  75. func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
  76. s.worldLock.RLock()
  77. defer s.worldLock.RUnlock()
  78. nodePath = path.Clean(path.Join("/", nodePath))
  79. n, err := s.internalGet(nodePath, index, term)
  80. if err != nil {
  81. s.Stats.Inc(GetFail)
  82. return nil, err
  83. }
  84. e := newEvent(Get, nodePath, index, term)
  85. if n.IsDir() { // node is a directory
  86. e.Dir = true
  87. children, _ := n.List()
  88. e.KVPairs = make([]KeyValuePair, len(children))
  89. // we do not use the index in the children slice directly
  90. // we need to skip the hidden one
  91. i := 0
  92. for _, child := range children {
  93. if child.IsHidden() { // get will not return hidden nodes
  94. continue
  95. }
  96. e.KVPairs[i] = child.Pair(recursive, sorted)
  97. i++
  98. }
  99. // eliminate hidden nodes
  100. e.KVPairs = e.KVPairs[:i]
  101. if sorted {
  102. sort.Sort(e.KVPairs)
  103. }
  104. } else { // node is a file
  105. e.Value, _ = n.Read()
  106. }
  107. e.Expiration, e.TTL = n.ExpirationAndTTL()
  108. s.Stats.Inc(GetSuccess)
  109. return e, nil
  110. }
  111. // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
  112. // If the node has already existed, create will fail.
  113. // If any node on the path is a file, create will fail.
  114. func (s *store) Create(nodePath string, value string, unique bool,
  115. expireTime time.Time, index uint64, term uint64) (*Event, error) {
  116. nodePath = path.Clean(path.Join("/", nodePath))
  117. s.worldLock.Lock()
  118. defer s.worldLock.Unlock()
  119. e, err := s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create)
  120. if err == nil {
  121. s.Stats.Inc(CreateSuccess)
  122. } else {
  123. s.Stats.Inc(CreateFail)
  124. }
  125. return e, err
  126. }
  127. // Set function creates or replace the Node at nodePath.
  128. func (s *store) Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  129. nodePath = path.Clean(path.Join("/", nodePath))
  130. s.worldLock.Lock()
  131. defer s.worldLock.Unlock()
  132. e, err := s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set)
  133. if err == nil {
  134. s.Stats.Inc(SetSuccess)
  135. } else {
  136. s.Stats.Inc(SetFail)
  137. }
  138. return e, err
  139. }
  140. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  141. value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  142. nodePath = path.Clean(path.Join("/", nodePath))
  143. s.worldLock.Lock()
  144. defer s.worldLock.Unlock()
  145. n, err := s.internalGet(nodePath, index, term)
  146. if err != nil {
  147. s.Stats.Inc(CompareAndSwapFail)
  148. return nil, err
  149. }
  150. if n.IsDir() { // can only test and set file
  151. s.Stats.Inc(CompareAndSwapFail)
  152. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
  153. }
  154. // If both of the prevValue and prevIndex are given, we will test both of them.
  155. // Command will be executed, only if both of the tests are successful.
  156. if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
  157. e := newEvent(CompareAndSwap, nodePath, index, term)
  158. e.PrevValue = n.Value
  159. // if test succeed, write the value
  160. n.Write(value, index, term)
  161. n.UpdateTTL(expireTime)
  162. e.Value = value
  163. e.Expiration, e.TTL = n.ExpirationAndTTL()
  164. s.WatcherHub.notify(e)
  165. s.Stats.Inc(CompareAndSwapSuccess)
  166. return e, nil
  167. }
  168. cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  169. s.Stats.Inc(CompareAndSwapFail)
  170. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, index, term)
  171. }
  172. // Delete function deletes the node at the given path.
  173. // If the node is a directory, recursive must be true to delete it.
  174. func (s *store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
  175. nodePath = path.Clean(path.Join("/", nodePath))
  176. s.worldLock.Lock()
  177. defer s.worldLock.Unlock()
  178. n, err := s.internalGet(nodePath, index, term)
  179. if err != nil { // if the node does not exist, return error
  180. s.Stats.Inc(DeleteFail)
  181. return nil, err
  182. }
  183. e := newEvent(Delete, nodePath, index, term)
  184. if n.IsDir() {
  185. e.Dir = true
  186. } else {
  187. e.PrevValue = n.Value
  188. }
  189. callback := func(path string) { // notify function
  190. // notify the watchers with delted set true
  191. s.WatcherHub.notifyWatchers(e, path, true)
  192. }
  193. err = n.Remove(recursive, callback)
  194. if err != nil {
  195. s.Stats.Inc(DeleteFail)
  196. return nil, err
  197. }
  198. s.WatcherHub.notify(e)
  199. s.Stats.Inc(DeleteSuccess)
  200. return e, nil
  201. }
  202. func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
  203. prefix = path.Clean(path.Join("/", prefix))
  204. s.worldLock.RLock()
  205. defer s.worldLock.RUnlock()
  206. s.Index, s.Term = index, term
  207. var c <-chan *Event
  208. var err *etcdErr.Error
  209. if sinceIndex == 0 {
  210. c, err = s.WatcherHub.watch(prefix, recursive, index+1)
  211. } else {
  212. c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex)
  213. }
  214. if err != nil {
  215. err.Index = index
  216. err.Term = term
  217. return nil, err
  218. }
  219. return c, nil
  220. }
  221. // walk function walks all the nodePath and apply the walkFunc on each directory
  222. func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) {
  223. components := strings.Split(nodePath, "/")
  224. curr := s.Root
  225. var err *etcdErr.Error
  226. for i := 1; i < len(components); i++ {
  227. if len(components[i]) == 0 { // ignore empty string
  228. return curr, nil
  229. }
  230. curr, err = walkFunc(curr, components[i])
  231. if err != nil {
  232. return nil, err
  233. }
  234. }
  235. return curr, nil
  236. }
  237. // Update function updates the value/ttl of the node.
  238. // If the node is a file, the value and the ttl can be updated.
  239. // If the node is a directory, only the ttl can be updated.
  240. func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  241. s.worldLock.Lock()
  242. defer s.worldLock.Unlock()
  243. nodePath = path.Clean(path.Join("/", nodePath))
  244. n, err := s.internalGet(nodePath, index, term)
  245. if err != nil { // if the node does not exist, return error
  246. s.Stats.Inc(UpdateFail)
  247. return nil, err
  248. }
  249. e := newEvent(Update, nodePath, s.Index, s.Term)
  250. if len(newValue) != 0 {
  251. if n.IsDir() {
  252. // if the node is a directory, we cannot update value
  253. s.Stats.Inc(UpdateFail)
  254. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
  255. }
  256. e.PrevValue = n.Value
  257. n.Write(newValue, index, term)
  258. }
  259. // update ttl
  260. n.UpdateTTL(expireTime)
  261. e.Value = newValue
  262. e.Expiration, e.TTL = n.ExpirationAndTTL()
  263. s.WatcherHub.notify(e)
  264. s.Stats.Inc(UpdateSuccess)
  265. return e, nil
  266. }
  267. func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
  268. expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
  269. s.Index, s.Term = index, term
  270. if unique { // append unique item under the node path
  271. nodePath += "/" + strconv.FormatUint(index, 10)
  272. }
  273. nodePath = path.Clean(path.Join("/", nodePath))
  274. dir, newNodeName := path.Split(nodePath)
  275. // walk through the nodePath, create dirs and get the last directory node
  276. d, err := s.walk(dir, s.checkDir)
  277. if err != nil {
  278. s.Stats.Inc(SetFail)
  279. err.Index, err.Term = s.Index, s.Term
  280. return nil, err
  281. }
  282. e := newEvent(action, nodePath, s.Index, s.Term)
  283. n, _ := d.GetChild(newNodeName)
  284. // force will try to replace a existing file
  285. if n != nil {
  286. if replace {
  287. if n.IsDir() {
  288. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
  289. }
  290. e.PrevValue, _ = n.Read()
  291. n.Remove(false, nil)
  292. } else {
  293. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, index, term)
  294. }
  295. }
  296. if len(value) != 0 { // create file
  297. e.Value = value
  298. n = newKV(s, nodePath, value, index, term, d, "", expireTime)
  299. } else { // create directory
  300. e.Dir = true
  301. n = newDir(s, nodePath, index, term, d, "", expireTime)
  302. }
  303. // we are sure d is a directory and does not have the children with name n.Name
  304. d.Add(n)
  305. // Node with TTL
  306. if expireTime.Sub(Permanent) != 0 {
  307. n.Expire()
  308. e.Expiration, e.TTL = n.ExpirationAndTTL()
  309. }
  310. s.WatcherHub.notify(e)
  311. return e, nil
  312. }
  313. // InternalGet function get the node of the given nodePath.
  314. func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) {
  315. nodePath = path.Clean(path.Join("/", nodePath))
  316. // update file system known index and term
  317. if index > s.Index {
  318. s.Index, s.Term = index, term
  319. }
  320. walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
  321. if !parent.IsDir() {
  322. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, index, term)
  323. return nil, err
  324. }
  325. child, ok := parent.Children[name]
  326. if ok {
  327. return child, nil
  328. }
  329. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), index, term)
  330. }
  331. f, err := s.walk(nodePath, walkFunc)
  332. if err != nil {
  333. return nil, err
  334. }
  335. return f, nil
  336. }
  337. // checkDir function will check whether the component is a directory under parent node.
  338. // If it is a directory, this function will return the pointer to that node.
  339. // If it does not exist, this function will create a new directory and return the pointer to that node.
  340. // If it is a file, this function will return error.
  341. func (s *store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
  342. node, ok := parent.Children[dirName]
  343. if ok {
  344. if node.IsDir() {
  345. return node, nil
  346. }
  347. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm)
  348. }
  349. n := newDir(s, path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent)
  350. parent.Children[dirName] = n
  351. return n, nil
  352. }
  353. // Save function saves the static state of the store system.
  354. // Save function will not be able to save the state of watchers.
  355. // Save function will not save the parent field of the node. Or there will
  356. // be cyclic dependencies issue for the json package.
  357. func (s *store) Save() ([]byte, error) {
  358. s.worldLock.Lock()
  359. clonedStore := newStore()
  360. clonedStore.Index = s.Index
  361. clonedStore.Term = s.Term
  362. clonedStore.Root = s.Root.Clone()
  363. clonedStore.WatcherHub = s.WatcherHub.clone()
  364. clonedStore.Stats = s.Stats.clone()
  365. clonedStore.CurrentVersion = s.CurrentVersion
  366. s.worldLock.Unlock()
  367. b, err := json.Marshal(clonedStore)
  368. if err != nil {
  369. return nil, err
  370. }
  371. return b, nil
  372. }
  373. // recovery function recovery the store system from a static state.
  374. // It needs to recovery the parent field of the nodes.
  375. // It needs to delete the expired nodes since the saved time and also
  376. // need to create monitor go routines.
  377. func (s *store) Recovery(state []byte) error {
  378. s.worldLock.Lock()
  379. defer s.worldLock.Unlock()
  380. err := json.Unmarshal(state, s)
  381. if err != nil {
  382. return err
  383. }
  384. s.Root.recoverAndclean()
  385. return nil
  386. }
  387. func (s *store) JsonStats() []byte {
  388. s.Stats.Watchers = uint64(s.WatcherHub.count)
  389. return s.Stats.toJson()
  390. }
  391. func (s *store) TotalTransactions() uint64 {
  392. return s.Stats.TotalTranscations()
  393. }