| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- package store
- import (
- "encoding/json"
- "fmt"
- "path"
- "sort"
- "strconv"
- "strings"
- "sync"
- "time"
- etcdErr "github.com/coreos/etcd/error"
- )
- type Store struct {
- Root *Node
- WatcherHub *watcherHub
- Index uint64
- Term uint64
- Stats *Stats
- worldLock sync.RWMutex // stop the world lock
- }
- func New() *Store {
- s := new(Store)
- s.Root = newDir("/", UndefIndex, UndefTerm, nil, "", Permanent)
- s.Stats = newStats()
- s.WatcherHub = newWatchHub(1000)
- return s
- }
- // Get function returns a get event.
- // If recursive is true, it will return all the content under the node path.
- // If sorted is true, it will sort the content by keys.
- func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
- s.worldLock.RLock()
- defer s.worldLock.RUnlock()
- nodePath = path.Clean(path.Join("/", nodePath))
- n, err := s.internalGet(nodePath, index, term)
- if err != nil {
- s.Stats.Inc(GetFail)
- return nil, err
- }
- e := newEvent(Get, nodePath, index, term)
- if n.IsDir() { // node is a directory
- e.Dir = true
- children, _ := n.List()
- e.KVPairs = make([]KeyValuePair, len(children))
- // we do not use the index in the children slice directly
- // we need to skip the hidden one
- i := 0
- for _, child := range children {
- if child.IsHidden() { // get will not return hidden nodes
- continue
- }
- e.KVPairs[i] = child.Pair(recursive, sorted)
- i++
- }
- // eliminate hidden nodes
- e.KVPairs = e.KVPairs[:i]
- if sorted {
- sort.Sort(e.KVPairs)
- }
- } else { // node is a file
- e.Value, _ = n.Read()
- }
- e.Expiration, e.TTL = n.ExpirationAndTTL()
- s.Stats.Inc(GetSuccess)
- return e, nil
- }
- // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
- // If the node has already existed, create will fail.
- // If any node on the path is a file, create will fail.
- func (s *Store) Create(nodePath string, value string, incrementalSuffix bool, force bool,
- expireTime time.Time, index uint64, term uint64) (*Event, error) {
- nodePath = path.Clean(path.Join("/", nodePath))
- s.worldLock.Lock()
- defer s.worldLock.Unlock()
- return s.internalCreate(nodePath, value, incrementalSuffix, force, expireTime, index, term, Create)
- }
- // Update function updates the value/ttl of the node.
- // If the node is a file, the value and the ttl can be updated.
- // If the node is a directory, only the ttl can be updated.
- func (s *Store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
- s.worldLock.Lock()
- defer s.worldLock.Unlock()
- nodePath = path.Clean(path.Join("/", nodePath))
- n, err := s.internalGet(nodePath, index, term)
- if err != nil { // if the node does not exist, return error
- s.Stats.Inc(UpdateFail)
- return nil, err
- }
- e := newEvent(Update, nodePath, s.Index, s.Term)
- if len(newValue) != 0 {
- if n.IsDir() {
- // if the node is a directory, we cannot update value
- s.Stats.Inc(UpdateFail)
- return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
- }
- e.PrevValue = n.Value
- n.Write(newValue, index, term)
- }
- // update ttl
- n.UpdateTTL(expireTime, s)
- e.Expiration, e.TTL = n.ExpirationAndTTL()
- s.WatcherHub.notify(e)
- s.Stats.Inc(UpdateSuccess)
- return e, nil
- }
- func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
- value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
- nodePath = path.Clean(path.Join("/", nodePath))
- s.worldLock.Lock()
- defer s.worldLock.Unlock()
- if prevValue == "" && prevIndex == 0 { // try create
- return s.internalCreate(nodePath, value, false, false, expireTime, index, term, TestAndSet)
- }
- n, err := s.internalGet(nodePath, index, term)
- if err != nil {
- s.Stats.Inc(TestAndSetFail)
- return nil, err
- }
- if n.IsDir() { // can only test and set file
- s.Stats.Inc(TestAndSetFail)
- return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
- }
- if n.Value == prevValue || n.ModifiedIndex == prevIndex {
- e := newEvent(TestAndSet, nodePath, index, term)
- e.PrevValue = n.Value
- // if test succeed, write the value
- n.Write(value, index, term)
- n.UpdateTTL(expireTime, s)
- e.Value = value
- e.Expiration, e.TTL = n.ExpirationAndTTL()
- s.WatcherHub.notify(e)
- s.Stats.Inc(TestAndSetSuccess)
- return e, nil
- }
- cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
- s.Stats.Inc(TestAndSetFail)
- return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, index, term)
- }
- // Delete function deletes the node at the given path.
- // If the node is a directory, recursive must be true to delete it.
- func (s *Store) Delete(nodePath string, recursive bool, index uint64, term uint64) (*Event, error) {
- nodePath = path.Clean(path.Join("/", nodePath))
- s.worldLock.Lock()
- defer s.worldLock.Unlock()
- n, err := s.internalGet(nodePath, index, term)
- if err != nil { // if the node does not exist, return error
- s.Stats.Inc(DeleteFail)
- return nil, err
- }
- e := newEvent(Delete, nodePath, index, term)
- if n.IsDir() {
- e.Dir = true
- } else {
- e.PrevValue = n.Value
- }
- callback := func(path string) { // notify function
- // notify the watchers with delted set true
- s.WatcherHub.notifyWatchers(e, path, true)
- }
- err = n.Remove(recursive, callback)
- if err != nil {
- s.Stats.Inc(DeleteFail)
- return nil, err
- }
- s.WatcherHub.notify(e)
- s.Stats.Inc(DeleteSuccess)
- return e, nil
- }
- func (s *Store) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
- prefix = path.Clean(path.Join("/", prefix))
- s.worldLock.RLock()
- defer s.worldLock.RUnlock()
- s.Index, s.Term = index, term
- var c <-chan *Event
- var err *etcdErr.Error
- if sinceIndex == 0 {
- c, err = s.WatcherHub.watch(prefix, recursive, index+1)
- } else {
- c, err = s.WatcherHub.watch(prefix, recursive, sinceIndex)
- }
- if err != nil {
- err.Index = index
- err.Term = term
- return nil, err
- }
- return c, nil
- }
- // walk function walks all the nodePath and apply the walkFunc on each directory
- func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, *etcdErr.Error)) (*Node, *etcdErr.Error) {
- components := strings.Split(nodePath, "/")
- curr := s.Root
- var err *etcdErr.Error
- for i := 1; i < len(components); i++ {
- if len(components[i]) == 0 { // ignore empty string
- return curr, nil
- }
- curr, err = walkFunc(curr, components[i])
- if err != nil {
- return nil, err
- }
- }
- return curr, nil
- }
- func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix bool, force bool,
- expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
- s.Index, s.Term = index, term
- if incrementalSuffix { // append unique incremental suffix to the node path
- nodePath += "_" + strconv.FormatUint(index, 10)
- }
- nodePath = path.Clean(path.Join("/", nodePath))
- dir, newNodeName := path.Split(nodePath)
- // walk through the nodePath, create dirs and get the last directory node
- d, err := s.walk(dir, s.checkDir)
- if err != nil {
- s.Stats.Inc(SetFail)
- err.Index, err.Term = s.Index, s.Term
- return nil, err
- }
- e := newEvent(action, nodePath, s.Index, s.Term)
- n, _ := d.GetChild(newNodeName)
- // force will try to replace a existing file
- if n != nil {
- if force {
- if n.IsDir() {
- return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, index, term)
- }
- e.PrevValue, _ = n.Read()
- n.Remove(false, nil)
- } else {
- return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, index, term)
- }
- }
- if len(value) != 0 { // create file
- e.Value = value
- n = newKV(nodePath, value, index, term, d, "", expireTime)
- } else { // create directory
- e.Dir = true
- n = newDir(nodePath, index, term, d, "", expireTime)
- }
- err = d.Add(n)
- if err != nil {
- s.Stats.Inc(SetFail)
- return nil, err
- }
- // Node with TTL
- if expireTime.Sub(Permanent) != 0 {
- n.Expire(s)
- e.Expiration, e.TTL = n.ExpirationAndTTL()
- }
- s.WatcherHub.notify(e)
- s.Stats.Inc(SetSuccess)
- return e, nil
- }
- // InternalGet function get the node of the given nodePath.
- func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node, *etcdErr.Error) {
- nodePath = path.Clean(path.Join("/", nodePath))
- // update file system known index and term
- if index > s.Index {
- s.Index, s.Term = index, term
- }
- walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
- if !parent.IsDir() {
- err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, index, term)
- return nil, err
- }
- child, ok := parent.Children[name]
- if ok {
- return child, nil
- }
- return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), index, term)
- }
- f, err := s.walk(nodePath, walkFunc)
- if err != nil {
- return nil, err
- }
- return f, nil
- }
- // checkDir function will check whether the component is a directory under parent node.
- // If it is a directory, this function will return the pointer to that node.
- // If it does not exist, this function will create a new directory and return the pointer to that node.
- // If it is a file, this function will return error.
- func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) {
- node, ok := parent.Children[dirName]
- if ok {
- if node.IsDir() {
- return node, nil
- }
- return nil, etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, UndefIndex, UndefTerm)
- }
- n := newDir(path.Join(parent.Path, dirName), s.Index, s.Term, parent, parent.ACL, Permanent)
- parent.Children[dirName] = n
- return n, nil
- }
- // Save function saves the static state of the store system.
- // Save function will not be able to save the state of watchers.
- // Save function will not save the parent field of the node. Or there will
- // be cyclic dependencies issue for the json package.
- func (s *Store) Save() ([]byte, error) {
- s.worldLock.Lock()
- clonedStore := New()
- clonedStore.Index = s.Index
- clonedStore.Term = s.Term
- clonedStore.Root = s.Root.Clone()
- clonedStore.WatcherHub = s.WatcherHub.clone()
- clonedStore.Stats = s.Stats.clone()
- s.worldLock.Unlock()
- b, err := json.Marshal(clonedStore)
- if err != nil {
- return nil, err
- }
- return b, nil
- }
- // recovery function recovery the store system from a static state.
- // It needs to recovery the parent field of the nodes.
- // It needs to delete the expired nodes since the saved time and also
- // need to create monitor go routines.
- func (s *Store) Recovery(state []byte) error {
- s.worldLock.Lock()
- defer s.worldLock.Unlock()
- err := json.Unmarshal(state, s)
- if err != nil {
- return err
- }
- s.Root.recoverAndclean(s)
- return nil
- }
- func (s *Store) JsonStats() []byte {
- s.Stats.Watchers = uint64(s.WatcherHub.count)
- return s.Stats.toJson()
- }
|