store.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "path"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. etcdErr "github.com/coreos/etcd/error"
  12. )
  13. // The default version to set when the store is first initialized.
  14. const defaultVersion = 2
  15. type Store interface {
  16. Version() int
  17. CommandFactory() CommandFactory
  18. Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error)
  19. Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
  20. Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error)
  21. Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time,
  22. index uint64, term uint64) (*Event, error)
  23. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  24. value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
  25. Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error)
  26. Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error)
  27. Save() ([]byte, error)
  28. Recovery(state []byte) error
  29. JsonStats() []byte
  30. }
  31. type store struct {
  32. Root *Node
  33. WatcherHub *watcherHub
  34. Index uint64
  35. Term uint64
  36. Stats *Stats
  37. CurrentVersion int
  38. worldLock sync.RWMutex // stop the world lock
  39. }
  40. func New() Store {
  41. return newStore()
  42. }
  43. func newStore() *store {
  44. s := new(store)
  45. s.CurrentVersion = defaultVersion
  46. s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent)
  47. s.Stats = newStats()
  48. s.WatcherHub = newWatchHub(1000)
  49. return s
  50. }
  51. // Version retrieves current version of the store.
  52. func (s *store) Version() int {
  53. return s.CurrentVersion
  54. }
  55. // CommandFactory retrieves the command factory for the current version of the store.
  56. func (s *store) CommandFactory() CommandFactory {
  57. return GetCommandFactory(s.Version())
  58. }
  59. // Get function returns a get event.
  60. // If recursive is true, it will return all the content under the node path.
  61. // If sorted is true, it will sort the content by keys.
  62. func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
  63. s.worldLock.RLock()
  64. defer s.worldLock.RUnlock()
  65. nodePath = path.Clean(path.Join("/", nodePath))
  66. n, err := s.internalGet(nodePath, index, term)
  67. if err != nil {
  68. s.Stats.Inc(GetFail)
  69. return nil, err
  70. }
  71. e := newEvent(Get, nodePath, index, term)
  72. if n.IsDir() { // node is a directory
  73. e.Dir = true
  74. children, _ := n.List()
  75. e.KVPairs = make([]KeyValuePair, len(children))
  76. // we do not use the index in the children slice directly
  77. // we need to skip the hidden one
  78. i := 0
  79. for _, child := range children {
  80. if child.IsHidden() { // get will not return hidden nodes
  81. continue
  82. }
  83. e.KVPairs[i] = child.Pair(recursive, sorted)
  84. i++
  85. }
  86. // eliminate hidden nodes
  87. e.KVPairs = e.KVPairs[:i]
  88. if sorted {
  89. sort.Sort(e.KVPairs)
  90. }
  91. } else { // node is a file
  92. e.Value, _ = n.Read()
  93. }
  94. e.Expiration, e.TTL = n.ExpirationAndTTL()
  95. s.Stats.Inc(GetSuccess)
  96. return e, nil
  97. }
  98. // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
  99. // If the node has already existed, create will fail.
  100. // If any node on the path is a file, create will fail.
  101. func (s *store) Create(nodePath string, value string, unique bool,
  102. expireTime time.Time, index uint64, term uint64) (*Event, error) {
  103. nodePath = path.Clean(path.Join("/", nodePath))
  104. s.worldLock.Lock()
  105. defer s.worldLock.Unlock()
  106. e, err := s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create)
  107. if err == nil {
  108. s.Stats.Inc(CreateSuccess)
  109. } else {
  110. s.Stats.Inc(CreateFail)
  111. }
  112. return e, err
  113. }
  114. // Set function creates or replace the Node at nodePath.
  115. func (s *store) Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  116. nodePath = path.Clean(path.Join("/", nodePath))
  117. s.worldLock.Lock()
  118. defer s.worldLock.Unlock()
  119. e, err := s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set)
  120. if err == nil {
  121. s.Stats.Inc(SetSuccess)
  122. } else {
  123. s.Stats.Inc(SetFail)
  124. }
  125. return e, err
  126. }
  127. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  128. value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  129. nodePath = path.Clean(path.Join("/", nodePath))
  130. s.worldLock.Lock()
  131. defer s.worldLock.Unlock()
  132. n, err := s.internalGet(nodePath, index, term)
  133. if err != nil {
  134. s.Stats.Inc(CompareAndSwapFail)
  135. return nil, err
  136. }
  137. if n.IsDir() { // can only test and set file
  138. s.Stats.Inc(CompareAndSwapFail)
  139. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
  140. }
  141. // If both of the prevValue and prevIndex are given, we will test both of them.
  142. // Command will be executed, only if both of the tests are successful.
  143. if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
  144. e := newEvent(CompareAndSwap, nodePath, index, term)
  145. e.PrevValue = n.Value
  146. // if test succeed, write the value
  147. n.Write(value, index, term)
  148. n.UpdateTTL(expireTime)
  149. e.Value = value
  150. e.Expiration, e.TTL = n.ExpirationAndTTL()
  151. s.WatcherHub.notify(e)
  152. s.Stats.Inc(CompareAndSwapSuccess)
  153. return e, nil
  154. }
  155. cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  156. s.Stats.Inc(CompareAndSwapFail)
  157. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, index, term)
  158. }
  159. // Delete function deletes the node at the given path.
  160. // If the node is a directory, recursive must be true to delete it.
  161. func (s *store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
  162. nodePath = path.Clean(path.Join("/", nodePath))
  163. s.worldLock.Lock()
  164. defer s.worldLock.Unlock()
  165. n, err := s.internalGet(nodePath, index, term)
  166. if err != nil { // if the node does not exist, return error
  167. s.Stats.Inc(DeleteFail)
  168. return nil, err
  169. }
  170. e := newEvent(Delete, nodePath, index, term)
  171. if n.IsDir() {
  172. e.Dir = true
  173. } else {
  174. e.PrevValue = n.Value
  175. }
  176. callback := func(path string) { // notify function
  177. // notify the watchers with delted set true
  178. s.WatcherHub.notifyWatchers(e, path, true)
  179. }
  180. err = n.Remove(recursive, callback)
  181. if err != nil {
  182. s.Stats.Inc(DeleteFail)
  183. return nil, err
  184. }
  185. s.WatcherHub.notify(e)
  186. s.Stats.Inc(DeleteSuccess)
  187. return e, nil
  188. }
  189. func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
  190. prefix = path.Clean(path.Join("/", prefix))
  191. s.worldLock.RLock()
  192. defer s.worldLock.RUnlock()
  193. s.Index, s.Term = index, term
  194. var c <-chan *Event
  195. var err *etcdErr.Error
  196. if sinceIndex == 0 {
  197. c, err = s.WatcherHub.watch(prefix, recursive, index+1)
  198. } else {
  199. c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex)
  200. }
  201. if err != nil {
  202. err.Index = index
  203. err.Term = term
  204. return nil, err
  205. }
  206. return c, nil
  207. }
  208. // walk function walks all the nodePath and apply the walkFunc on each directory
  209. func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) {
  210. components := strings.Split(nodePath, "/")
  211. curr := s.Root
  212. var err *etcdErr.Error
  213. for i := 1; i < len(components); i++ {
  214. if len(components[i]) == 0 { // ignore empty string
  215. return curr, nil
  216. }
  217. curr, err = walkFunc(curr, components[i])
  218. if err != nil {
  219. return nil, err
  220. }
  221. }
  222. return curr, nil
  223. }
  224. // Update function updates the value/ttl of the node.
  225. // If the node is a file, the value and the ttl can be updated.
  226. // If the node is a directory, only the ttl can be updated.
  227. func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  228. s.worldLock.Lock()
  229. defer s.worldLock.Unlock()
  230. nodePath = path.Clean(path.Join("/", nodePath))
  231. n, err := s.internalGet(nodePath, index, term)
  232. if err != nil { // if the node does not exist, return error
  233. s.Stats.Inc(UpdateFail)
  234. return nil, err
  235. }
  236. e := newEvent(Update, nodePath, s.Index, s.Term)
  237. if len(newValue) != 0 {
  238. if n.IsDir() {
  239. // if the node is a directory, we cannot update value
  240. s.Stats.Inc(UpdateFail)
  241. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
  242. }
  243. e.PrevValue = n.Value
  244. n.Write(newValue, index, term)
  245. }
  246. // update ttl
  247. n.UpdateTTL(expireTime)
  248. e.Value = newValue
  249. e.Expiration, e.TTL = n.ExpirationAndTTL()
  250. s.WatcherHub.notify(e)
  251. s.Stats.Inc(UpdateSuccess)
  252. return e, nil
  253. }
  254. func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
  255. expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
  256. s.Index, s.Term = index, term
  257. if unique { // append unique item under the node path
  258. nodePath += "/" + strconv.FormatUint(index, 10)
  259. }
  260. nodePath = path.Clean(path.Join("/", nodePath))
  261. dir, newNodeName := path.Split(nodePath)
  262. // walk through the nodePath, create dirs and get the last directory node
  263. d, err := s.walk(dir, s.checkDir)
  264. if err != nil {
  265. s.Stats.Inc(SetFail)
  266. err.Index, err.Term = s.Index, s.Term
  267. return nil, err
  268. }
  269. e := newEvent(action, nodePath, s.Index, s.Term)
  270. n, _ := d.GetChild(newNodeName)
  271. // force will try to replace a existing file
  272. if n != nil {
  273. if replace {
  274. if n.IsDir() {
  275. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
  276. }
  277. e.PrevValue, _ = n.Read()
  278. n.Remove(false, nil)
  279. } else {
  280. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, index, term)
  281. }
  282. }
  283. if len(value) != 0 { // create file
  284. e.Value = value
  285. n = newKV(s, nodePath, value, index, term, d, "", expireTime)
  286. } else { // create directory
  287. e.Dir = true
  288. n = newDir(s, nodePath, index, term, d, "", expireTime)
  289. }
  290. // we are sure d is a directory and does not have the children with name n.Name
  291. d.Add(n)
  292. // Node with TTL
  293. if expireTime.Sub(Permanent) != 0 {
  294. n.Expire()
  295. e.Expiration, e.TTL = n.ExpirationAndTTL()
  296. }
  297. s.WatcherHub.notify(e)
  298. return e, nil
  299. }
  300. // InternalGet function get the node of the given nodePath.
  301. func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) {
  302. nodePath = path.Clean(path.Join("/", nodePath))
  303. // update file system known index and term
  304. if index > s.Index {
  305. s.Index, s.Term = index, term
  306. }
  307. walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
  308. if !parent.IsDir() {
  309. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, index, term)
  310. return nil, err
  311. }
  312. child, ok := parent.Children[name]
  313. if ok {
  314. return child, nil
  315. }
  316. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), index, term)
  317. }
  318. f, err := s.walk(nodePath, walkFunc)
  319. if err != nil {
  320. return nil, err
  321. }
  322. return f, nil
  323. }
  324. // checkDir function will check whether the component is a directory under parent node.
  325. // If it is a directory, this function will return the pointer to that node.
  326. // If it does not exist, this function will create a new directory and return the pointer to that node.
  327. // If it is a file, this function will return error.
  328. func (s *store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
  329. node, ok := parent.Children[dirName]
  330. if ok {
  331. if node.IsDir() {
  332. return node, nil
  333. }
  334. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm)
  335. }
  336. n := newDir(s, path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent)
  337. parent.Children[dirName] = n
  338. return n, nil
  339. }
  340. // Save function saves the static state of the store system.
  341. // Save function will not be able to save the state of watchers.
  342. // Save function will not save the parent field of the node. Or there will
  343. // be cyclic dependencies issue for the json package.
  344. func (s *store) Save() ([]byte, error) {
  345. s.worldLock.Lock()
  346. clonedStore := newStore()
  347. clonedStore.Index = s.Index
  348. clonedStore.Term = s.Term
  349. clonedStore.Root = s.Root.Clone()
  350. clonedStore.WatcherHub = s.WatcherHub.clone()
  351. clonedStore.Stats = s.Stats.clone()
  352. clonedStore.CurrentVersion = s.CurrentVersion
  353. s.worldLock.Unlock()
  354. b, err := json.Marshal(clonedStore)
  355. if err != nil {
  356. return nil, err
  357. }
  358. return b, nil
  359. }
  360. // recovery function recovery the store system from a static state.
  361. // It needs to recovery the parent field of the nodes.
  362. // It needs to delete the expired nodes since the saved time and also
  363. // need to create monitor go routines.
  364. func (s *store) Recovery(state []byte) error {
  365. s.worldLock.Lock()
  366. defer s.worldLock.Unlock()
  367. err := json.Unmarshal(state, s)
  368. if err != nil {
  369. return err
  370. }
  371. s.Root.recoverAndclean()
  372. return nil
  373. }
  374. func (s *store) JsonStats() []byte {
  375. s.Stats.Watchers = uint64(s.WatcherHub.count)
  376. return s.Stats.toJson()
  377. }