|
@@ -6,6 +6,7 @@ import (
|
|
|
"path"
|
|
"path"
|
|
|
"strconv"
|
|
"strconv"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
+ "sync"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
//------------------------------------------------------------------------------
|
|
@@ -20,6 +21,8 @@ type Store struct {
|
|
|
// key-value store structure
|
|
// key-value store structure
|
|
|
Tree *tree
|
|
Tree *tree
|
|
|
|
|
|
|
|
|
|
+ mutex sync.Mutex
|
|
|
|
|
+
|
|
|
// WatcherHub is where we register all the clients
|
|
// WatcherHub is where we register all the clients
|
|
|
// who issue a watch request
|
|
// who issue a watch request
|
|
|
watcher *WatcherHub
|
|
watcher *WatcherHub
|
|
@@ -136,9 +139,16 @@ func (s *Store) SetMessager(messager *chan string) {
|
|
|
s.messager = messager
|
|
s.messager = messager
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// Set the key to value with expiration time
|
|
|
|
|
func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
|
|
func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
|
|
|
|
|
+ s.mutex.Lock()
|
|
|
|
|
+ defer s.mutex.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ return s.internalSet(key, value, expireTime, index)
|
|
|
|
|
+
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
|
|
+// Set the key to value with expiration time
|
|
|
|
|
+func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
|
|
|
//Update index
|
|
//Update index
|
|
|
s.Index = index
|
|
s.Index = index
|
|
|
|
|
|
|
@@ -161,7 +171,7 @@ func (s *Store) Set(key string, value string, expireTime time.Time, index uint64
|
|
|
// the key may be expired, we should not add the node
|
|
// the key may be expired, we should not add the node
|
|
|
// also if the node exist, we need to delete the node
|
|
// also if the node exist, we need to delete the node
|
|
|
if isExpire && expireTime.Sub(time.Now()) < 0 {
|
|
if isExpire && expireTime.Sub(time.Now()) < 0 {
|
|
|
- return s.Delete(key, index)
|
|
|
|
|
|
|
+ return s.internalDelete(key, index)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var TTL int64
|
|
var TTL int64
|
|
@@ -290,6 +300,9 @@ func (s *Store) internalGet(key string) *Response {
|
|
|
// If key is a file return the file
|
|
// If key is a file return the file
|
|
|
// If key is a directory reuturn an array of files
|
|
// If key is a directory reuturn an array of files
|
|
|
func (s *Store) Get(key string) ([]byte, error) {
|
|
func (s *Store) Get(key string) ([]byte, error) {
|
|
|
|
|
+ s.mutex.Lock()
|
|
|
|
|
+ defer s.mutex.Unlock()
|
|
|
|
|
+
|
|
|
resps, err := s.RawGet(key)
|
|
resps, err := s.RawGet(key)
|
|
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -312,28 +325,49 @@ func (s *Store) RawGet(key string) ([]*Response, error) {
|
|
|
nodes, keys, dirs, ok := s.Tree.list(key)
|
|
nodes, keys, dirs, ok := s.Tree.list(key)
|
|
|
|
|
|
|
|
if ok {
|
|
if ok {
|
|
|
|
|
+
|
|
|
|
|
+ node, ok := nodes.(*Node)
|
|
|
|
|
+
|
|
|
|
|
+ if ok {
|
|
|
|
|
+ resps := make([]*Response, 1)
|
|
|
|
|
+
|
|
|
|
|
+ isExpire := !node.ExpireTime.Equal(PERMANENT)
|
|
|
|
|
+
|
|
|
|
|
+ resps[0] = &Response{
|
|
|
|
|
+ Action: "GET",
|
|
|
|
|
+ Index: s.Index,
|
|
|
|
|
+ Key: key,
|
|
|
|
|
+ Value: node.Value,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Update ttl
|
|
|
|
|
+ if isExpire {
|
|
|
|
|
+ TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second)
|
|
|
|
|
+ resps[0].Expiration = &node.ExpireTime
|
|
|
|
|
+ resps[0].TTL = TTL
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return resps, nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ nodes, _ := nodes.([]*Node)
|
|
|
|
|
+
|
|
|
resps := make([]*Response, len(nodes))
|
|
resps := make([]*Response, len(nodes))
|
|
|
for i := 0; i < len(nodes); i++ {
|
|
for i := 0; i < len(nodes); i++ {
|
|
|
|
|
|
|
|
var TTL int64
|
|
var TTL int64
|
|
|
var isExpire bool = false
|
|
var isExpire bool = false
|
|
|
- var thisKey string
|
|
|
|
|
|
|
|
|
|
isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
|
|
isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
|
|
|
|
|
|
|
|
- if keys != nil {
|
|
|
|
|
- thisKey = path.Join(key, keys[i])
|
|
|
|
|
- } else {
|
|
|
|
|
- thisKey = key
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
resps[i] = &Response{
|
|
resps[i] = &Response{
|
|
|
Action: "GET",
|
|
Action: "GET",
|
|
|
Index: s.Index,
|
|
Index: s.Index,
|
|
|
- Key: thisKey,
|
|
|
|
|
|
|
+ Key: path.Join(key, keys[i]),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if dirs == nil || !dirs[i] {
|
|
|
|
|
|
|
+ if !dirs[i] {
|
|
|
resps[i].Value = nodes[i].Value
|
|
resps[i].Value = nodes[i].Value
|
|
|
} else {
|
|
} else {
|
|
|
resps[i].Dir = true
|
|
resps[i].Dir = true
|
|
@@ -355,8 +389,14 @@ func (s *Store) RawGet(key string) ([]*Response, error) {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// Delete the key
|
|
|
|
|
func (s *Store) Delete(key string, index uint64) ([]byte, error) {
|
|
func (s *Store) Delete(key string, index uint64) ([]byte, error) {
|
|
|
|
|
+ s.mutex.Lock()
|
|
|
|
|
+ defer s.mutex.Unlock()
|
|
|
|
|
+ return s.internalDelete(key, index)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// Delete the key
|
|
|
|
|
+func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
|
|
|
|
|
|
|
|
// Update stats
|
|
// Update stats
|
|
|
s.BasicStats.Deletes++
|
|
s.BasicStats.Deletes++
|
|
@@ -411,6 +451,9 @@ func (s *Store) Delete(key string, index uint64) ([]byte, error) {
|
|
|
|
|
|
|
|
// Set the value of the key to the value if the given prevValue is equal to the value of the key
|
|
// Set the value of the key to the value if the given prevValue is equal to the value of the key
|
|
|
func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
|
|
func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
|
|
|
|
|
+ s.mutex.Lock()
|
|
|
|
|
+ defer s.mutex.Unlock()
|
|
|
|
|
+
|
|
|
// Update stats
|
|
// Update stats
|
|
|
s.BasicStats.TestAndSets++
|
|
s.BasicStats.TestAndSets++
|
|
|
|
|
|
|
@@ -424,7 +467,7 @@ func (s *Store) TestAndSet(key string, prevValue string, value string, expireTim
|
|
|
if resp.Value == prevValue {
|
|
if resp.Value == prevValue {
|
|
|
|
|
|
|
|
// If test success, do set
|
|
// If test success, do set
|
|
|
- return s.Set(key, value, expireTime, index)
|
|
|
|
|
|
|
+ return s.internalSet(key, value, expireTime, index)
|
|
|
} else {
|
|
} else {
|
|
|
|
|
|
|
|
// If fails, return err
|
|
// If fails, return err
|
|
@@ -459,6 +502,7 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
} else {
|
|
} else {
|
|
|
|
|
+ s.mutex.Lock()
|
|
|
|
|
|
|
|
s.Tree.delete(key)
|
|
s.Tree.delete(key)
|
|
|
|
|
|
|
@@ -469,6 +513,7 @@ func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime
|
|
|
Expiration: &node.ExpireTime,
|
|
Expiration: &node.ExpireTime,
|
|
|
Index: s.Index,
|
|
Index: s.Index,
|
|
|
}
|
|
}
|
|
|
|
|
+ s.mutex.Unlock()
|
|
|
|
|
|
|
|
msg, err := json.Marshal(resp)
|
|
msg, err := json.Marshal(resp)
|
|
|
|
|
|
|
@@ -527,9 +572,32 @@ func (s *Store) addToResponseMap(index uint64, resp *Response) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (s *Store) clone() *Store {
|
|
|
|
|
+ newStore := & Store{
|
|
|
|
|
+ ResponseMaxSize: s.ResponseMaxSize,
|
|
|
|
|
+ ResponseCurrSize: s.ResponseCurrSize,
|
|
|
|
|
+ ResponseStartIndex: s.ResponseStartIndex,
|
|
|
|
|
+ Index: s.Index,
|
|
|
|
|
+ BasicStats: s.BasicStats,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ newStore.Tree = s.Tree.clone()
|
|
|
|
|
+ newStore.ResponseMap = make(map[string]*Response)
|
|
|
|
|
+
|
|
|
|
|
+ for index, response := range s.ResponseMap {
|
|
|
|
|
+ newStore.ResponseMap[index] = response
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return newStore
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// Save the current state of the storage system
|
|
// Save the current state of the storage system
|
|
|
func (s *Store) Save() ([]byte, error) {
|
|
func (s *Store) Save() ([]byte, error) {
|
|
|
- b, err := json.Marshal(s)
|
|
|
|
|
|
|
+ s.mutex.Lock()
|
|
|
|
|
+ cloneStore := s.clone()
|
|
|
|
|
+ s.mutex.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ b, err := json.Marshal(cloneStore)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
fmt.Println(err)
|
|
fmt.Println(err)
|
|
|
return nil, err
|
|
return nil, err
|
|
@@ -539,7 +607,8 @@ func (s *Store) Save() ([]byte, error) {
|
|
|
|
|
|
|
|
// Recovery the state of the stroage system from a previous state
|
|
// Recovery the state of the stroage system from a previous state
|
|
|
func (s *Store) Recovery(state []byte) error {
|
|
func (s *Store) Recovery(state []byte) error {
|
|
|
-
|
|
|
|
|
|
|
+ s.mutex.Lock()
|
|
|
|
|
+ defer s.mutex.Unlock()
|
|
|
// we need to stop all the current watchers
|
|
// we need to stop all the current watchers
|
|
|
// recovery will clear watcherHub
|
|
// recovery will clear watcherHub
|
|
|
s.watcher.stopWatchers()
|
|
s.watcher.stopWatchers()
|