| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662 |
- package store
- import (
- "encoding/json"
- "fmt"
- etcdErr "github.com/coreos/etcd/error"
- "path"
- "strconv"
- "sync"
- "time"
- )
- //------------------------------------------------------------------------------
- //
- // Typedefs
- //
- //------------------------------------------------------------------------------
- // The main struct of the Key-Value store
- type Store struct {
- // key-value store structure
- Tree *tree
- // This mutex protects everything except add watcher member.
- // Add watch member does not depend on the current state of the store.
- // And watch will return when other protected function is called and reach
- // the watching condition.
- // It is needed so that clone() can atomically replicate the Store
- // and do the log snapshot in a go routine.
- mutex sync.RWMutex
- // WatcherHub is where we register all the clients
- // who issue a watch request
- watcher *WatcherHub
- // The string channel to send messages to the outside world
- // Now we use it to send changes to the hub of the web service
- messager chan<- string
- // A map to keep the recent response to the clients
- ResponseMap map[string]*Response
- // The max number of the recent responses we can record
- ResponseMaxSize int
- // The current number of the recent responses we have recorded
- ResponseCurrSize uint
- // The index of the first recent responses we have
- ResponseStartIndex uint64
- // Current index of the raft machine
- Index uint64
- // Basic statistics information of etcd storage
- BasicStats EtcdStats
- }
- // A Node represents a Value in the Key-Value pair in the store
- // It has its value, expire time and a channel used to update the
- // expire time (since we do countdown in a go routine, we need to
- // communicate with it via channel)
- type Node struct {
- // The string value of the node
- Value string `json:"value"`
- // If the node is a permanent one the ExprieTime will be Unix(0,0)
- // Otherwise after the expireTime, the node will be deleted
- ExpireTime time.Time `json:"expireTime"`
- // A channel to update the expireTime of the node
- update chan time.Time `json:"-"`
- }
- // The response from the store to the user who issue a command
- type Response struct {
- Action string `json:"action"`
- Key string `json:"key"`
- Dir bool `json:"dir,omitempty"`
- PrevValue string `json:"prevValue,omitempty"`
- Value string `json:"value,omitempty"`
- // If the key did not exist before the action,
- // this field should be set to true
- NewKey bool `json:"newKey,omitempty"`
- Expiration *time.Time `json:"expiration,omitempty"`
- // Time to live in second
- TTL int64 `json:"ttl,omitempty"`
- // The command index of the raft machine when the command is executed
- Index uint64 `json:"index"`
- }
- // A listNode represent the simplest Key-Value pair with its type
- // It is only used when do list opeartion
- // We want to have a file system like store, thus we distingush "file"
- // and "directory"
- type ListNode struct {
- Key string
- Value string
- Type string
- }
- var PERMANENT = time.Unix(0, 0)
- //------------------------------------------------------------------------------
- //
- // Methods
- //
- //------------------------------------------------------------------------------
- // Create a new stroe
- // Arguement max is the max number of response we want to record
- func CreateStore(max int) *Store {
- s := new(Store)
- s.messager = nil
- s.ResponseMap = make(map[string]*Response)
- s.ResponseStartIndex = 0
- s.ResponseMaxSize = max
- s.ResponseCurrSize = 0
- s.Tree = &tree{
- &treeNode{
- Node{
- "/",
- time.Unix(0, 0),
- nil,
- },
- true,
- make(map[string]*treeNode),
- },
- }
- s.watcher = newWatcherHub()
- return s
- }
- // Set the messager of the store
- func (s *Store) SetMessager(messager chan<- string) {
- s.messager = messager
- }
- func (s *Store) Set(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return s.internalSet(key, value, expireTime, index)
- }
- // Set the key to value with expiration time
- func (s *Store) internalSet(key string, value string, expireTime time.Time, index uint64) ([]byte, error) {
- //Update index
- s.Index = index
- //Update stats
- s.BasicStats.Sets++
- key = path.Clean("/" + key)
- isExpire := !expireTime.Equal(PERMANENT)
- // base response
- resp := Response{
- Action: "SET",
- Key: key,
- Value: value,
- Index: index,
- }
- // When the slow follower receive the set command
- // the key may be expired, we should not add the node
- // also if the node exist, we need to delete the node
- if isExpire && expireTime.Sub(time.Now()) < 0 {
- return s.internalDelete(key, index)
- }
- var TTL int64
- // Update ttl
- if isExpire {
- TTL = int64(expireTime.Sub(time.Now()) / time.Second)
- resp.Expiration = &expireTime
- resp.TTL = TTL
- }
- // Get the node
- node, ok := s.Tree.get(key)
- if ok {
- // Update when node exists
- // Node is not permanent
- if !node.ExpireTime.Equal(PERMANENT) {
- // If node is not permanent
- // Update its expireTime
- node.update <- expireTime
- } else {
- // If we want the permanent node to have expire time
- // We need to create a go routine with a channel
- if isExpire {
- node.update = make(chan time.Time)
- go s.monitorExpiration(key, node.update, expireTime)
- }
- }
- // Update the information of the node
- s.Tree.set(key, Node{value, expireTime, node.update})
- resp.PrevValue = node.Value
- s.watcher.notify(resp)
- msg, err := json.Marshal(resp)
- // Send to the messager
- if s.messager != nil && err == nil {
- s.messager <- string(msg)
- }
- s.addToResponseMap(index, &resp)
- return msg, err
- // Add new node
- } else {
- update := make(chan time.Time)
- ok := s.Tree.set(key, Node{value, expireTime, update})
- if !ok {
- return nil, etcdErr.NewError(102, "set: "+key)
- }
- if isExpire {
- go s.monitorExpiration(key, update, expireTime)
- }
- resp.NewKey = true
- msg, err := json.Marshal(resp)
- // Nofity the watcher
- s.watcher.notify(resp)
- // Send to the messager
- if s.messager != nil && err == nil {
- s.messager <- string(msg)
- }
- s.addToResponseMap(index, &resp)
- return msg, err
- }
- }
- // Get the value of the key and return the raw response
- func (s *Store) internalGet(key string) *Response {
- key = path.Clean("/" + key)
- node, ok := s.Tree.get(key)
- if ok {
- var TTL int64
- var isExpire bool = false
- isExpire = !node.ExpireTime.Equal(PERMANENT)
- resp := &Response{
- Action: "GET",
- Key: key,
- Value: node.Value,
- Index: s.Index,
- }
- // Update ttl
- if isExpire {
- TTL = int64(node.ExpireTime.Sub(time.Now()) / time.Second)
- resp.Expiration = &node.ExpireTime
- resp.TTL = TTL
- }
- return resp
- } else {
- // we do not found the key
- return nil
- }
- }
- // Get all the items under key
- // If key is a file return the file
- // If key is a directory reuturn an array of files
- func (s *Store) Get(key string) ([]byte, error) {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- resps, err := s.RawGet(key)
- if err != nil {
- return nil, err
- }
- key = path.Clean("/" + key)
- // If the number of resps == 1 and the response key
- // is the key we query, a signal key-value should
- // be returned
- if len(resps) == 1 && resps[0].Key == key {
- return json.Marshal(resps[0])
- }
- return json.Marshal(resps)
- }
- func (s *Store) rawGetNode(key string, node *Node) ([]*Response, error) {
- resps := make([]*Response, 1)
- isExpire := !node.ExpireTime.Equal(PERMANENT)
- resps[0] = &Response{
- Action: "GET",
- Index: s.Index,
- Key: key,
- Value: node.Value,
- }
- // Update ttl
- if isExpire {
- TTL := int64(node.ExpireTime.Sub(time.Now()) / time.Second)
- resps[0].Expiration = &node.ExpireTime
- resps[0].TTL = TTL
- }
- return resps, nil
- }
- func (s *Store) rawGetNodeList(key string, keys []string, nodes []*Node) ([]*Response, error) {
- resps := make([]*Response, len(nodes))
- // TODO: check if nodes and keys are the same length
- for i := 0; i < len(nodes); i++ {
- var TTL int64
- var isExpire bool = false
- isExpire = !nodes[i].ExpireTime.Equal(PERMANENT)
- resps[i] = &Response{
- Action: "GET",
- Index: s.Index,
- Key: path.Join(key, keys[i]),
- }
- if len(nodes[i].Value) != 0 {
- resps[i].Value = nodes[i].Value
- } else {
- resps[i].Dir = true
- }
- // Update ttl
- if isExpire {
- TTL = int64(nodes[i].ExpireTime.Sub(time.Now()) / time.Second)
- resps[i].Expiration = &nodes[i].ExpireTime
- resps[i].TTL = TTL
- }
- }
- return resps, nil
- }
- func (s *Store) RawGet(key string) ([]*Response, error) {
- // Update stats
- s.BasicStats.Gets++
- key = path.Clean("/" + key)
- nodes, keys, ok := s.Tree.list(key)
- if !ok {
- return nil, etcdErr.NewError(100, "get: "+key)
- }
- switch node := nodes.(type) {
- case *Node:
- return s.rawGetNode(key, node)
- case []*Node:
- return s.rawGetNodeList(key, keys, node)
- default:
- panic("invalid cast ")
- }
- }
- func (s *Store) Delete(key string, index uint64) ([]byte, error) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return s.internalDelete(key, index)
- }
- // Delete the key
- func (s *Store) internalDelete(key string, index uint64) ([]byte, error) {
- // Update stats
- s.BasicStats.Deletes++
- key = path.Clean("/" + key)
- // Update index
- s.Index = index
- node, ok := s.Tree.get(key)
- if !ok {
- return nil, etcdErr.NewError(100, "delete: "+key)
- }
- resp := Response{
- Action: "DELETE",
- Key: key,
- PrevValue: node.Value,
- Index: index,
- }
- if node.ExpireTime.Equal(PERMANENT) {
- s.Tree.delete(key)
- } else {
- resp.Expiration = &node.ExpireTime
- // Kill the expire go routine
- node.update <- PERMANENT
- s.Tree.delete(key)
- }
- msg, err := json.Marshal(resp)
- s.watcher.notify(resp)
- // notify the messager
- if s.messager != nil && err == nil {
- s.messager <- string(msg)
- }
- s.addToResponseMap(index, &resp)
- return msg, err
- }
- // Set the value of the key to the value if the given prevValue is equal to the value of the key
- func (s *Store) TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- // Update stats
- s.BasicStats.TestAndSets++
- resp := s.internalGet(key)
- if resp == nil {
- if prevValue != "" {
- errmsg := fmt.Sprintf("TestAndSet: key not found and previousValue is not empty %s:%s ", key, prevValue)
- return nil, etcdErr.NewError(100, errmsg)
- }
- return s.internalSet(key, value, expireTime, index)
- }
- if resp.Value == prevValue {
- // If test succeed, do set
- return s.internalSet(key, value, expireTime, index)
- } else {
- // If fails, return err
- return nil, etcdErr.NewError(101, fmt.Sprintf("TestAndSet: %s!=%s",
- resp.Value, prevValue))
- }
- }
- // Add a channel to the watchHub.
- // The watchHub will send response to the channel when any key under the prefix
- // changes [since the sinceIndex if given]
- func (s *Store) AddWatcher(prefix string, watcher *Watcher, sinceIndex uint64) error {
- return s.watcher.addWatcher(prefix, watcher, sinceIndex, s.ResponseStartIndex, s.Index, s.ResponseMap)
- }
- // This function should be created as a go routine to delete the key-value pair
- // when it reaches expiration time
- func (s *Store) monitorExpiration(key string, update chan time.Time, expireTime time.Time) {
- duration := expireTime.Sub(time.Now())
- for {
- select {
- // Timeout delete the node
- case <-time.After(duration):
- node, ok := s.Tree.get(key)
- if !ok {
- return
- } else {
- s.mutex.Lock()
- s.Tree.delete(key)
- resp := Response{
- Action: "DELETE",
- Key: key,
- PrevValue: node.Value,
- Expiration: &node.ExpireTime,
- Index: s.Index,
- }
- s.mutex.Unlock()
- msg, err := json.Marshal(resp)
- s.watcher.notify(resp)
- // notify the messager
- if s.messager != nil && err == nil {
- s.messager <- string(msg)
- }
- return
- }
- case updateTime := <-update:
- // Update duration
- // If the node become a permanent one, the go routine is
- // not needed
- if updateTime.Equal(PERMANENT) {
- return
- }
- // Update duration
- duration = updateTime.Sub(time.Now())
- }
- }
- }
- // When we receive a command that will change the state of the key-value store
- // We will add the result of it to the ResponseMap for the use of watch command
- // Also we may remove the oldest response when we add new one
- func (s *Store) addToResponseMap(index uint64, resp *Response) {
- // zero case
- if s.ResponseMaxSize == 0 {
- return
- }
- strIndex := strconv.FormatUint(index, 10)
- s.ResponseMap[strIndex] = resp
- // unlimited
- if s.ResponseMaxSize < 0 {
- s.ResponseCurrSize++
- return
- }
- // if we reach the max point, we need to delete the most latest
- // response and update the startIndex
- if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
- s.ResponseStartIndex++
- delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
- } else {
- s.ResponseCurrSize++
- }
- }
- func (s *Store) clone() *Store {
- newStore := &Store{
- ResponseMaxSize: s.ResponseMaxSize,
- ResponseCurrSize: s.ResponseCurrSize,
- ResponseStartIndex: s.ResponseStartIndex,
- Index: s.Index,
- BasicStats: s.BasicStats,
- }
- newStore.Tree = s.Tree.clone()
- newStore.ResponseMap = make(map[string]*Response)
- for index, response := range s.ResponseMap {
- newStore.ResponseMap[index] = response
- }
- return newStore
- }
- // Save the current state of the storage system
- func (s *Store) Save() ([]byte, error) {
- // first we clone the store
- // json is very slow, we cannot hold the lock for such a long time
- s.mutex.Lock()
- cloneStore := s.clone()
- s.mutex.Unlock()
- b, err := json.Marshal(cloneStore)
- if err != nil {
- fmt.Println(err)
- return nil, err
- }
- return b, nil
- }
- // Recovery the state of the stroage system from a previous state
- func (s *Store) Recovery(state []byte) error {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- // we need to stop all the current watchers
- // recovery will clear watcherHub
- s.watcher.stopWatchers()
- err := json.Unmarshal(state, s)
- // The only thing need to change after the recovery is the
- // node with expiration time, we need to delete all the node
- // that have been expired and setup go routines to monitor the
- // other ones
- s.checkExpiration()
- return err
- }
- // Clean the expired nodes
- // Set up go routines to mon
- func (s *Store) checkExpiration() {
- s.Tree.traverse(s.checkNode, false)
- }
- // Check each node
- func (s *Store) checkNode(key string, node *Node) {
- if node.ExpireTime.Equal(PERMANENT) {
- return
- } else {
- if node.ExpireTime.Sub(time.Now()) >= time.Second {
- node.update = make(chan time.Time)
- go s.monitorExpiration(key, node.update, node.ExpireTime)
- } else {
- // we should delete this node
- s.Tree.delete(key)
- }
- }
- }
|