store.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. package store
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "path"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. etcdErr "github.com/coreos/etcd/error"
  12. )
  13. type Store interface {
  14. Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error)
  15. Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
  16. Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error)
  17. Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time,
  18. index uint64, term uint64) (*Event, error)
  19. CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  20. value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
  21. Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error)
  22. Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error)
  23. Save() ([]byte, error)
  24. Recovery(state []byte) error
  25. JsonStats() []byte
  26. }
  27. type store struct {
  28. Root *Node
  29. WatcherHub *watcherHub
  30. Index uint64
  31. Term uint64
  32. Stats *Stats
  33. worldLock sync.RWMutex // stop the world lock
  34. }
  35. func New() Store {
  36. return newStore()
  37. }
  38. func newStore() *store {
  39. s := new(store)
  40. s.Root = newDir(s, "/", UndefIndex, UndefTerm, nil, "", Permanent)
  41. s.Stats = newStats()
  42. s.WatcherHub = newWatchHub(1000)
  43. return s
  44. }
  45. // Get function returns a get event.
  46. // If recursive is true, it will return all the content under the node path.
  47. // If sorted is true, it will sort the content by keys.
  48. func (s *store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
  49. s.worldLock.RLock()
  50. defer s.worldLock.RUnlock()
  51. nodePath = path.Clean(path.Join("/", nodePath))
  52. n, err := s.internalGet(nodePath, index, term)
  53. if err != nil {
  54. s.Stats.Inc(GetFail)
  55. return nil, err
  56. }
  57. e := newEvent(Get, nodePath, index, term)
  58. if n.IsDir() { // node is a directory
  59. e.Dir = true
  60. children, _ := n.List()
  61. e.KVPairs = make([]KeyValuePair, len(children))
  62. // we do not use the index in the children slice directly
  63. // we need to skip the hidden one
  64. i := 0
  65. for _, child := range children {
  66. if child.IsHidden() { // get will not return hidden nodes
  67. continue
  68. }
  69. e.KVPairs[i] = child.Pair(recursive, sorted)
  70. i++
  71. }
  72. // eliminate hidden nodes
  73. e.KVPairs = e.KVPairs[:i]
  74. if sorted {
  75. sort.Sort(e.KVPairs)
  76. }
  77. } else { // node is a file
  78. e.Value, _ = n.Read()
  79. }
  80. e.Expiration, e.TTL = n.ExpirationAndTTL()
  81. s.Stats.Inc(GetSuccess)
  82. return e, nil
  83. }
  84. // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
  85. // If the node has already existed, create will fail.
  86. // If any node on the path is a file, create will fail.
  87. func (s *store) Create(nodePath string, value string, unique bool,
  88. expireTime time.Time, index uint64, term uint64) (*Event, error) {
  89. nodePath = path.Clean(path.Join("/", nodePath))
  90. s.worldLock.Lock()
  91. defer s.worldLock.Unlock()
  92. e, err := s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create)
  93. if err == nil {
  94. s.Stats.Inc(CreateSuccess)
  95. } else {
  96. s.Stats.Inc(CreateFail)
  97. }
  98. return e, err
  99. }
  100. // Set function creates or replace the Node at nodePath.
  101. func (s *store) Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  102. nodePath = path.Clean(path.Join("/", nodePath))
  103. s.worldLock.Lock()
  104. defer s.worldLock.Unlock()
  105. e, err := s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set)
  106. if err == nil {
  107. s.Stats.Inc(SetSuccess)
  108. } else {
  109. s.Stats.Inc(SetFail)
  110. }
  111. return e, err
  112. }
  113. func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  114. value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  115. nodePath = path.Clean(path.Join("/", nodePath))
  116. s.worldLock.Lock()
  117. defer s.worldLock.Unlock()
  118. n, err := s.internalGet(nodePath, index, term)
  119. if err != nil {
  120. s.Stats.Inc(CompareAndSwapFail)
  121. return nil, err
  122. }
  123. if n.IsDir() { // can only test and set file
  124. s.Stats.Inc(CompareAndSwapFail)
  125. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
  126. }
  127. // If both of the prevValue and prevIndex are given, we will test both of them.
  128. // Command will be executed, only if both of the tests are successful.
  129. if (prevValue == "" || n.Value == prevValue) && (prevIndex == 0 || n.ModifiedIndex == prevIndex) {
  130. e := newEvent(CompareAndSwap, nodePath, index, term)
  131. e.PrevValue = n.Value
  132. // if test succeed, write the value
  133. n.Write(value, index, term)
  134. n.UpdateTTL(expireTime)
  135. e.Value = value
  136. e.Expiration, e.TTL = n.ExpirationAndTTL()
  137. s.WatcherHub.notify(e)
  138. s.Stats.Inc(CompareAndSwapSuccess)
  139. return e, nil
  140. }
  141. cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  142. s.Stats.Inc(CompareAndSwapFail)
  143. return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, index, term)
  144. }
  145. // Delete function deletes the node at the given path.
  146. // If the node is a directory, recursive must be true to delete it.
  147. func (s *store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
  148. nodePath = path.Clean(path.Join("/", nodePath))
  149. s.worldLock.Lock()
  150. defer s.worldLock.Unlock()
  151. n, err := s.internalGet(nodePath, index, term)
  152. if err != nil { // if the node does not exist, return error
  153. s.Stats.Inc(DeleteFail)
  154. return nil, err
  155. }
  156. e := newEvent(Delete, nodePath, index, term)
  157. if n.IsDir() {
  158. e.Dir = true
  159. } else {
  160. e.PrevValue = n.Value
  161. }
  162. callback := func(path string) { // notify function
  163. // notify the watchers with delted set true
  164. s.WatcherHub.notifyWatchers(e, path, true)
  165. }
  166. err = n.Remove(recursive, callback)
  167. if err != nil {
  168. s.Stats.Inc(DeleteFail)
  169. return nil, err
  170. }
  171. s.WatcherHub.notify(e)
  172. s.Stats.Inc(DeleteSuccess)
  173. return e, nil
  174. }
  175. func (s *store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
  176. prefix = path.Clean(path.Join("/", prefix))
  177. s.worldLock.RLock()
  178. defer s.worldLock.RUnlock()
  179. s.Index, s.Term = index, term
  180. var c <-chan *Event
  181. var err *etcdErr.Error
  182. if sinceIndex == 0 {
  183. c, err = s.WatcherHub.watch(prefix, recursive, index+1)
  184. } else {
  185. c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex)
  186. }
  187. if err != nil {
  188. err.Index = index
  189. err.Term = term
  190. return nil, err
  191. }
  192. return c, nil
  193. }
  194. // walk function walks all the nodePath and apply the walkFunc on each directory
  195. func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) {
  196. components := strings.Split(nodePath, "/")
  197. curr := s.Root
  198. var err *etcdErr.Error
  199. for i := 1; i < len(components); i++ {
  200. if len(components[i]) == 0 { // ignore empty string
  201. return curr, nil
  202. }
  203. curr, err = walkFunc(curr, components[i])
  204. if err != nil {
  205. return nil, err
  206. }
  207. }
  208. return curr, nil
  209. }
  210. // Update function updates the value/ttl of the node.
  211. // If the node is a file, the value and the ttl can be updated.
  212. // If the node is a directory, only the ttl can be updated.
  213. func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  214. s.worldLock.Lock()
  215. defer s.worldLock.Unlock()
  216. nodePath = path.Clean(path.Join("/", nodePath))
  217. n, err := s.internalGet(nodePath, index, term)
  218. if err != nil { // if the node does not exist, return error
  219. s.Stats.Inc(UpdateFail)
  220. return nil, err
  221. }
  222. e := newEvent(Update, nodePath, s.Index, s.Term)
  223. if len(newValue) != 0 {
  224. if n.IsDir() {
  225. // if the node is a directory, we cannot update value
  226. s.Stats.Inc(UpdateFail)
  227. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
  228. }
  229. e.PrevValue = n.Value
  230. n.Write(newValue, index, term)
  231. }
  232. // update ttl
  233. n.UpdateTTL(expireTime)
  234. e.Value = newValue
  235. e.Expiration, e.TTL = n.ExpirationAndTTL()
  236. s.WatcherHub.notify(e)
  237. s.Stats.Inc(UpdateSuccess)
  238. return e, nil
  239. }
  240. func (s *store) internalCreate(nodePath string, value string, unique bool, replace bool,
  241. expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
  242. s.Index, s.Term = index, term
  243. if unique { // append unique item under the node path
  244. nodePath += "/" + strconv.FormatUint(index, 10)
  245. }
  246. nodePath = path.Clean(path.Join("/", nodePath))
  247. dir, newNodeName := path.Split(nodePath)
  248. // walk through the nodePath, create dirs and get the last directory node
  249. d, err := s.walk(dir, s.checkDir)
  250. if err != nil {
  251. s.Stats.Inc(SetFail)
  252. err.Index, err.Term = s.Index, s.Term
  253. return nil, err
  254. }
  255. e := newEvent(action, nodePath, s.Index, s.Term)
  256. n, _ := d.GetChild(newNodeName)
  257. // force will try to replace a existing file
  258. if n != nil {
  259. if replace {
  260. if n.IsDir() {
  261. return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
  262. }
  263. e.PrevValue, _ = n.Read()
  264. n.Remove(false, nil)
  265. } else {
  266. return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, index, term)
  267. }
  268. }
  269. if len(value) != 0 { // create file
  270. e.Value = value
  271. n = newKV(s, nodePath, value, index, term, d, "", expireTime)
  272. } else { // create directory
  273. e.Dir = true
  274. n = newDir(s, nodePath, index, term, d, "", expireTime)
  275. }
  276. // we are sure d is a directory and does not have the children with name n.Name
  277. d.Add(n)
  278. // Node with TTL
  279. if expireTime.Sub(Permanent) != 0 {
  280. n.Expire()
  281. e.Expiration, e.TTL = n.ExpirationAndTTL()
  282. }
  283. s.WatcherHub.notify(e)
  284. return e, nil
  285. }
  286. // InternalGet function get the node of the given nodePath.
  287. func (s *store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) {
  288. nodePath = path.Clean(path.Join("/", nodePath))
  289. // update file system known index and term
  290. if index > s.Index {
  291. s.Index, s.Term = index, term
  292. }
  293. walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
  294. if !parent.IsDir() {
  295. err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, index, term)
  296. return nil, err
  297. }
  298. child, ok := parent.Children[name]
  299. if ok {
  300. return child, nil
  301. }
  302. return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), index, term)
  303. }
  304. f, err := s.walk(nodePath, walkFunc)
  305. if err != nil {
  306. return nil, err
  307. }
  308. return f, nil
  309. }
  310. // checkDir function will check whether the component is a directory under parent node.
  311. // If it is a directory, this function will return the pointer to that node.
  312. // If it does not exist, this function will create a new directory and return the pointer to that node.
  313. // If it is a file, this function will return error.
  314. func (s *store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
  315. node, ok := parent.Children[dirName]
  316. if ok {
  317. if node.IsDir() {
  318. return node, nil
  319. }
  320. return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm)
  321. }
  322. n := newDir(s, path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent)
  323. parent.Children[dirName] = n
  324. return n, nil
  325. }
  326. // Save function saves the static state of the store system.
  327. // Save function will not be able to save the state of watchers.
  328. // Save function will not save the parent field of the node. Or there will
  329. // be cyclic dependencies issue for the json package.
  330. func (s *store) Save() ([]byte, error) {
  331. s.worldLock.Lock()
  332. clonedStore := newStore()
  333. clonedStore.Index = s.Index
  334. clonedStore.Term = s.Term
  335. clonedStore.Root = s.Root.Clone()
  336. clonedStore.WatcherHub = s.WatcherHub.clone()
  337. clonedStore.Stats = s.Stats.clone()
  338. s.worldLock.Unlock()
  339. b, err := json.Marshal(clonedStore)
  340. if err != nil {
  341. return nil, err
  342. }
  343. return b, nil
  344. }
  345. // recovery function recovery the store system from a static state.
  346. // It needs to recovery the parent field of the nodes.
  347. // It needs to delete the expired nodes since the saved time and also
  348. // need to create monitor go routines.
  349. func (s *store) Recovery(state []byte) error {
  350. s.worldLock.Lock()
  351. defer s.worldLock.Unlock()
  352. err := json.Unmarshal(state, s)
  353. if err != nil {
  354. return err
  355. }
  356. s.Root.recoverAndclean()
  357. return nil
  358. }
  359. func (s *store) JsonStats() []byte {
  360. s.Stats.Watchers = uint64(s.WatcherHub.count)
  361. return s.Stats.toJson()
  362. }