file_system.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package fileSystem
  2. import (
  3. "fmt"
  4. "path"
  5. "strings"
  6. "time"
  7. etcdErr "github.com/coreos/etcd/error"
  8. )
  9. type FileSystem struct {
  10. Root *Node
  11. EventHistory *EventHistory
  12. WatcherHub *watcherHub
  13. Index uint64
  14. Term uint64
  15. }
  16. func New() *FileSystem {
  17. return &FileSystem{
  18. Root: newDir("/", 0, 0, nil, "", Permanent),
  19. WatcherHub: newWatchHub(1000),
  20. }
  21. }
  22. func (fs *FileSystem) Get(nodePath string, recusive bool, index uint64, term uint64) (*Event, error) {
  23. n, err := fs.InternalGet(nodePath, index, term)
  24. if err != nil {
  25. return nil, err
  26. }
  27. e := newEvent(Get, nodePath, index, term)
  28. if n.IsDir() { // node is dir
  29. e.Dir = true
  30. children, _ := n.List()
  31. e.KVPairs = make([]KeyValuePair, len(children))
  32. // we do not use the index in the children slice directly
  33. // we need to skip the hidden one
  34. i := 0
  35. for _, child := range children {
  36. if child.IsHidden() { // get will not list hidden node
  37. continue
  38. }
  39. e.KVPairs[i] = child.Pair(recusive)
  40. i++
  41. }
  42. // eliminate hidden nodes
  43. e.KVPairs = e.KVPairs[:i]
  44. } else { // node is file
  45. e.Value = n.Value
  46. }
  47. return e, nil
  48. }
  49. // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
  50. // If the node has already existed, create will fail.
  51. // If any node on the path is a file, create will fail.
  52. func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  53. nodePath = path.Clean("/" + nodePath)
  54. // make sure we can create the node
  55. _, err := fs.InternalGet(nodePath, index, term)
  56. if err == nil { // key already exists
  57. return nil, etcdErr.NewError(105, nodePath)
  58. }
  59. etcdError, _ := err.(etcdErr.Error)
  60. if etcdError.ErrorCode == 104 { // we cannot create the key due to meet a file while walking
  61. return nil, err
  62. }
  63. dir, _ := path.Split(nodePath)
  64. // walk through the nodePath, create dirs and get the last directory node
  65. d, err := fs.walk(dir, fs.checkDir)
  66. if err != nil {
  67. return nil, err
  68. }
  69. e := newEvent(Create, nodePath, fs.Index, fs.Term)
  70. var n *Node
  71. if len(value) != 0 { // create file
  72. e.Value = value
  73. n = newFile(nodePath, value, fs.Index, fs.Term, d, "", expireTime)
  74. } else { // create directory
  75. e.Dir = true
  76. n = newDir(nodePath, fs.Index, fs.Term, d, "", expireTime)
  77. }
  78. err = d.Add(n)
  79. if err != nil {
  80. return nil, err
  81. }
  82. // Node with TTL
  83. if expireTime != Permanent {
  84. go n.Expire()
  85. e.Expiration = &n.ExpireTime
  86. e.TTL = int64(expireTime.Sub(time.Now()) / time.Second)
  87. }
  88. fs.WatcherHub.notify(e)
  89. return e, nil
  90. }
  91. // Update function updates the value/ttl of the node.
  92. // If the node is a file, the value and the ttl can be updated.
  93. // If the node is a directory, only the ttl can be updated.
  94. func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  95. n, err := fs.InternalGet(nodePath, index, term)
  96. if err != nil { // if the node does not exist, return error
  97. return nil, err
  98. }
  99. e := newEvent(Update, nodePath, fs.Index, fs.Term)
  100. if n.IsDir() { // if the node is a directory, we can only update ttl
  101. if len(value) != 0 {
  102. return nil, etcdErr.NewError(102, nodePath)
  103. }
  104. } else { // if the node is a file, we can update value and ttl
  105. e.PrevValue = n.Value
  106. if len(value) != 0 {
  107. e.Value = value
  108. }
  109. n.Write(value, index, term)
  110. }
  111. // update ttl
  112. if n.ExpireTime != Permanent && expireTime != Permanent {
  113. n.stopExpire <- true
  114. }
  115. if expireTime != Permanent {
  116. go n.Expire()
  117. e.Expiration = &n.ExpireTime
  118. e.TTL = int64(expireTime.Sub(time.Now()) / time.Second)
  119. }
  120. fs.WatcherHub.notify(e)
  121. return e, nil
  122. }
  123. func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
  124. value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
  125. f, err := fs.InternalGet(nodePath, index, term)
  126. if err != nil {
  127. return nil, err
  128. }
  129. if f.IsDir() { // can only test and set file
  130. return nil, etcdErr.NewError(102, nodePath)
  131. }
  132. if f.Value == prevValue || f.ModifiedIndex == prevIndex {
  133. // if test succeed, write the value
  134. e := newEvent(TestAndSet, nodePath, index, term)
  135. e.PrevValue = f.Value
  136. e.Value = value
  137. f.Write(value, index, term)
  138. fs.WatcherHub.notify(e)
  139. return e, nil
  140. }
  141. cause := fmt.Sprintf("[%v/%v] [%v/%v]", prevValue, f.Value, prevIndex, f.ModifiedIndex)
  142. return nil, etcdErr.NewError(101, cause)
  143. }
  144. // Delete function deletes the node at the given path.
  145. // If the node is a directory, recursive must be true to delete it.
  146. func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
  147. n, err := fs.InternalGet(nodePath, index, term)
  148. if err != nil { // if the node does not exist, return error
  149. return nil, err
  150. }
  151. e := newEvent(Delete, nodePath, index, term)
  152. if n.IsDir() {
  153. e.Dir = true
  154. } else {
  155. e.PrevValue = n.Value
  156. }
  157. callback := func(path string) { // notify function
  158. fs.WatcherHub.notifyWithPath(e, path, true)
  159. }
  160. err = n.Remove(recursive, callback)
  161. if err != nil {
  162. return nil, err
  163. }
  164. fs.WatcherHub.notify(e)
  165. return e, nil
  166. }
  167. // walk function walks all the nodePath and apply the walkFunc on each directory
  168. func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) {
  169. components := strings.Split(nodePath, "/")
  170. curr := fs.Root
  171. var err error
  172. for i := 1; i < len(components); i++ {
  173. if len(components[i]) == 0 { // ignore empty string
  174. return curr, nil
  175. }
  176. curr, err = walkFunc(curr, components[i])
  177. if err != nil {
  178. return nil, err
  179. }
  180. }
  181. return curr, nil
  182. }
  183. // InternalGet function get the node of the given nodePath.
  184. func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) {
  185. nodePath = path.Clean("/" + nodePath)
  186. // update file system known index and term
  187. fs.Index, fs.Term = index, term
  188. walkFunc := func(parent *Node, name string) (*Node, error) {
  189. if !parent.IsDir() {
  190. return nil, etcdErr.NewError(104, parent.Path)
  191. }
  192. child, ok := parent.Children[name]
  193. if ok {
  194. return child, nil
  195. }
  196. return nil, etcdErr.NewError(100, path.Join(parent.Path, name))
  197. }
  198. f, err := fs.walk(nodePath, walkFunc)
  199. if err != nil {
  200. return nil, err
  201. }
  202. return f, nil
  203. }
  204. // checkDir function will check whether the component is a directory under parent node.
  205. // If it is a directory, this function will return the pointer to that node.
  206. // If it does not exist, this function will create a new directory and return the pointer to that node.
  207. // If it is a file, this function will return error.
  208. func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) {
  209. subDir, ok := parent.Children[dirName]
  210. if ok {
  211. return subDir, nil
  212. }
  213. n := newDir(path.Join(parent.Path, dirName), fs.Index, fs.Term, parent, parent.ACL, Permanent)
  214. parent.Children[dirName] = n
  215. return n, nil
  216. }