|
|
@@ -5,6 +5,7 @@ import (
|
|
|
"fmt"
|
|
|
"path"
|
|
|
"strconv"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
@@ -20,6 +21,14 @@ type Store struct {
|
|
|
// key-value store structure
|
|
|
Tree *tree
|
|
|
|
|
|
+ // This mutex protects everything except add watcher member.
|
|
|
+ // Add watch member does not depend on the current state of the store.
|
|
|
+ // And watch will return when other protected function is called and reach
|
|
|
+ // the watching condition.
|
|
|
+ // It is needed so that clone() can atomically replicate the Store
|
|
|
+ // and do the log snapshot in a go routine.
|
|
|
+ mutex sync.Mutex
|
|
|
+
|
|
|
// WatcherHub is where we register all the clients
|
|
|
// who issue a watch request
|
|
|
watcher *WatcherHub
|
|
|
@@ -136,9 +145,16 @@ func (s *Store) SetMessager(messager *chan string) {
|
|
|
s.messager = messager
|
|
|
}
|
|
|
|
|
|
-// Set the key to value with expiration time
|
|
|
func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
|
|
|
+ s.mutex.Lock()
|
|
|
+ defer s.mutex.Unlock()
|
|
|
+
|
|
|
+ return s.internalSet(key, value, expireTime, index)
|
|
|
+
|
|
|
+}
|
|
|
|
|
|
+// Set the key to value with expiration time
|
|
|
+func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
|
|
|
//Update index
|
|
|
s.Index = index
|
|
|
|
|
|
@@ -161,7 +177,7 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
|
|
|
// the key may be expired, we should not add the node
|
|
|
// also if the node exist, we need to delete the node
|
|
|
if isExpire && expireTime.Sub(time.Now()) < 0 {
|
|
|
- return s.Delete(key, index)
|
|
|
+ return s.internalDelete(key, index)
|
|
|
}
|
|
|
|
|
|
var TTL int64
|
|
|
@@ -290,6 +306,9 @@ func (s *Store) internalGet(key string) *Response {
|
|
|
// If key is a file return the file
|
|
|
// If key is a directory reuturn an array of files
|
|
|
func (s *Store) Get(key string) ([]byte, error) {
|
|
|
+ s.mutex.Lock()
|
|
|
+ defer s.mutex.Unlock()
|
|
|
+
|
|
|
resps, err := s.RawGet(key)
|
|
|
|
|
|
if err != nil {
|
|
|
@@ -309,9 +328,36 @@ func (s *Store) RawGet(key string) ([]*Response, error) {
|
|
|
|
|
|
key = path.Clean("/" + key)
|
|
|
|
|
|
- nodes, keys, dirs, ok := s.Tree.list(key)
|
|
|
+ nodes, keys, ok := s.Tree.list(key)
|
|
|
|
|
|
if ok {
|
|
|
+
|
|
|
+ node, ok := nodes.(*Node)
|
|
|
+
|
|
|
+ if ok {
|
|
|
+ resps := make([]*Response, 1)
|
|
|
+
|
|
|
+ isExpire := !node.ExpireTime.Equal(PERMANENT)
|
|
|
+
|
|
|
+ resps[0] = &Response{
|
|
|
+ Action: "GET",
|
|
|
+ Index: s.Index,
|
|
|
+ Key: key,
|
|
|
+ Value: node.Value,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Update ttl
|
|
|
+ if isExpire {
|
|
|
+ TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second)
|
|
|
+ resps[0].Expiration = &node.ExpireTime
|
|
|
+ resps[0].TTL = TTL
|
|
|
+ }
|
|
|
+
|
|
|
+ return resps, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ nodes, _ := nodes.([]*Node)
|
|
|
+
|
|
|
resps := make([]*Response, len(nodes))
|
|
|
for i := 0; i < len(nodes); i++ {
|
|
|
|
|
|
@@ -326,7 +372,7 @@ func (s *Store) RawGet(key string) ([]*Response, error) {
|
|
|
Key: path.Join(key, keys[i]),
|
|
|
}
|
|
|
|
|
|
- if !dirs[i] {
|
|
|
+ if len(nodes[i].Value) != 0 {
|
|
|
resps[i].Value = nodes[i].Value
|
|
|
} else {
|
|
|
resps[i].Dir = true
|
|
|
@@ -348,8 +394,14 @@ func (s *Store) RawGet(key string) ([]*Response, error) {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
-// Delete the key
|
|
|
func (s *Store) Delete(key string, index uint64) ([]byte, error) {
|
|
|
+ s.mutex.Lock()
|
|
|
+ defer s.mutex.Unlock()
|
|
|
+ return s.internalDelete(key, index)
|
|
|
+}
|
|
|
+
|
|
|
+// Delete the key
|
|
|
+func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
|
|
|
|
|
|
// Update stats
|
|
|
s.BasicStats.Deletes++
|
|
|
@@ -404,6 +456,9 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) {
|
|
|
|
|
|
// Set the value of the key to the value if the given prevValue is equal to the value of the key
|
|
|
func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
|
|
|
+ s.mutex.Lock()
|
|
|
+ defer s.mutex.Unlock()
|
|
|
+
|
|
|
// Update stats
|
|
|
s.BasicStats.TestAndSets++
|
|
|
|
|
|
@@ -417,7 +472,7 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim
|
|
|
if resp.Value == prevValue {
|
|
|
|
|
|
// If test success, do set
|
|
|
- return s.Set(key, value, expireTime, index)
|
|
|
+ return s.internalSet(key, value, expireTime, index)
|
|
|
} else {
|
|
|
|
|
|
// If fails, return err
|
|
|
@@ -452,6 +507,7 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime
|
|
|
return
|
|
|
|
|
|
} else {
|
|
|
+ s.mutex.Lock()
|
|
|
|
|
|
s.Tree.delete(key)
|
|
|
|
|
|
@@ -462,6 +518,7 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime
|
|
|
Expiration: &node.ExpireTime,
|
|
|
Index: s.Index,
|
|
|
}
|
|
|
+ s.mutex.Unlock()
|
|
|
|
|
|
msg, err := json.Marshal(resp)
|
|
|
|
|
|
@@ -520,9 +577,34 @@ func (s *Store) addToResponseMap(index uint64, resp *Response) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (s *Store) clone() *Store {
|
|
|
+ newStore := &Store{
|
|
|
+ ResponseMaxSize: s.ResponseMaxSize,
|
|
|
+ ResponseCurrSize: s.ResponseCurrSize,
|
|
|
+ ResponseStartIndex: s.ResponseStartIndex,
|
|
|
+ Index: s.Index,
|
|
|
+ BasicStats: s.BasicStats,
|
|
|
+ }
|
|
|
+
|
|
|
+ newStore.Tree = s.Tree.clone()
|
|
|
+ newStore.ResponseMap = make(map[string]*Response)
|
|
|
+
|
|
|
+ for index, response := range s.ResponseMap {
|
|
|
+ newStore.ResponseMap[index] = response
|
|
|
+ }
|
|
|
+
|
|
|
+ return newStore
|
|
|
+}
|
|
|
+
|
|
|
// Save the current state of the storage system
|
|
|
func (s *Store) Save() ([]byte, error) {
|
|
|
- b, err := json.Marshal(s)
|
|
|
+ // first we clone the store
|
|
|
+ // json is very slow, we cannot hold the lock for such a long time
|
|
|
+ s.mutex.Lock()
|
|
|
+ cloneStore := s.clone()
|
|
|
+ s.mutex.Unlock()
|
|
|
+
|
|
|
+ b, err := json.Marshal(cloneStore)
|
|
|
if err != nil {
|
|
|
fmt.Println(err)
|
|
|
return nil, err
|
|
|
@@ -532,7 +614,8 @@ func (s *Store) Save() ([]byte, error) {
|
|
|
|
|
|
// Recovery the state of the stroage system from a previous state
|
|
|
func (s *Store) Recovery(state []byte) error {
|
|
|
-
|
|
|
+ s.mutex.Lock()
|
|
|
+ defer s.mutex.Unlock()
|
|
|
// we need to stop all the current watchers
|
|
|
// recovery will clear watcherHub
|
|
|
s.watcher.stopWatchers()
|