Browse Source

use map to store the results.

Xiang Li 12 years ago
parent
commit
a2b44bf9b7
3 changed files with 56 additions and 21 deletions
  1. 2 0
      etcd.go
  2. 41 9
      store/store.go
  3. 13 12
      store/watcher.go

+ 2 - 0
etcd.go

@@ -48,6 +48,8 @@ var dirPath string
 
 
 var ignore bool
 var ignore bool
 
 
+var responseBufferSize int
+
 func init() {
 func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 
 

+ 41 - 9
store/store.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"fmt"
 	"path"
 	"path"
 	"time"
 	"time"
+	"strconv"
 )
 )
 
 
 // global store
 // global store
@@ -28,8 +29,13 @@ type Store struct {
 	// now we use it to send changes to the hub of the web service
 	// now we use it to send changes to the hub of the web service
 	messager *chan string
 	messager *chan string
 
 
-	// previous responses
-	Responses [1024]Response
+	// 
+	ResponseMap map[string]Response
+
+	//
+	ResponseMaxSize int
+
+	ResponseCurrSize uint
 
 
 	// at some point, we may need to compact the Response
 	// at some point, we may need to compact the Response
 	ResponseStartIndex uint64
 	ResponseStartIndex uint64
@@ -71,12 +77,15 @@ func init() {
 	s = createStore()
 	s = createStore()
 	s.messager = nil
 	s.messager = nil
 	s.ResponseStartIndex = 0
 	s.ResponseStartIndex = 0
+	s.ResponseMaxSize = 1024
+	s.ResponseCurrSize = 0
 }
 }
 
 
 // make a new stroe
 // make a new stroe
 func createStore() *Store {
 func createStore() *Store {
 	s := new(Store)
 	s := new(Store)
 	s.Nodes = make(map[string]Node)
 	s.Nodes = make(map[string]Node)
+	s.ResponseMap = make(map[string]Response)
 	s.ResponseStartIndex = 0
 	s.ResponseStartIndex = 0
 	return s
 	return s
 }
 }
@@ -155,7 +164,7 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
 			*s.messager <- string(msg)
 			*s.messager <- string(msg)
 		}
 		}
 
 
-		s.Responses[index - s.ResponseStartIndex] = resp
+		updateMap(index, &resp)
 
 
 		return msg, err
 		return msg, err
 
 
@@ -182,10 +191,8 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
 
 
 			*s.messager <- string(msg)
 			*s.messager <- string(msg)
 		}
 		}
-		
-		s.Responses[index - s.ResponseStartIndex] = resp
-		
-		fmt.Println(index - s.ResponseStartIndex)
+
+		updateMap(index, &resp)
 		return msg, err
 		return msg, err
 	}
 	}
 }
 }
@@ -235,6 +242,30 @@ func expire(key string, update chan time.Time, expireTime time.Time) {
 	}
 	}
 }
 }
 
 
+func updateMap(index uint64, resp *Response) {
+
+	if s.ResponseMaxSize == 0 {
+		return
+	}
+
+	strIndex := strconv.FormatUint(index, 10)
+	s.ResponseMap[strIndex] = *resp
+
+	// unlimited
+	if s.ResponseMaxSize < 0{
+		s.ResponseCurrSize++
+		return
+	}
+
+	if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
+		s.ResponseStartIndex++
+		delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
+	} else {
+		s.ResponseCurrSize++
+	}
+}
+
+
 // get the value of the key
 // get the value of the key
 func Get(key string) Response {
 func Get(key string) Response {
 	key = path.Clean(key)
 	key = path.Clean(key)
@@ -296,13 +327,14 @@ func Delete(key string, index uint64) ([]byte, error) {
 			*s.messager <- string(msg)
 			*s.messager <- string(msg)
 		}
 		}
 
 
-		s.Responses[index - s.ResponseStartIndex] = resp
+		updateMap(index, &resp)
 		return msg, err
 		return msg, err
 
 
 	} else {
 	} else {
 
 
 		resp := Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index}
 		resp := Response{DELETE, key, "", "", false, time.Unix(0, 0), 0, index}
-		s.Responses[index - s.ResponseStartIndex] = resp
+
+		updateMap(index, &resp)
 
 
 		return json.Marshal(resp)
 		return json.Marshal(resp)
 	}
 	}

+ 13 - 12
store/watcher.go

@@ -2,8 +2,8 @@ package store
 
 
 import (
 import (
 	"path"
 	"path"
+	"strconv"
 	"strings"
 	"strings"
-	"fmt"
 )
 )
 
 
 type WatcherHub struct {
 type WatcherHub struct {
@@ -41,7 +41,7 @@ func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error {
 	if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex {
 	if sinceIndex != 0 && sinceIndex >= s.ResponseStartIndex {
 		for i := sinceIndex; i <= s.Index; i++ {
 		for i := sinceIndex; i <= s.Index; i++ {
 			if check(prefix, i) {
 			if check(prefix, i) {
-				c <- s.Responses[i]
+				c <- s.ResponseMap[strconv.FormatUint(i, 10)]
 				return nil
 				return nil
 			}
 			}
 		}
 		}
@@ -66,22 +66,23 @@ func AddWatcher(prefix string, c chan Response, sinceIndex uint64) error {
 	return nil
 	return nil
 }
 }
 
 
-// check if the response has what we are waching
+// check if the response has what we are watching
 func check(prefix string, index uint64) bool {
 func check(prefix string, index uint64) bool {
 
 
-	index = index - s.ResponseStartIndex
+	resp, ok := s.ResponseMap[strconv.FormatUint(index, 10)]
 
 
-	if index < 0 {
+	if !ok {
+		// not storage system command
 		return false
 		return false
-	}
+	} else {
+		path := resp.Key
+		if strings.HasPrefix(path, prefix) {
+			prefixLen := len(prefix)
+			if len(path) == prefixLen || path[prefixLen] == '/' {
+				return true
+			}
 
 
-	path := s.Responses[index].Key
-	if strings.HasPrefix(path, prefix) {
-		prefixLen := len(prefix)
-		if len(path) == prefixLen || path[prefixLen] == '/' {
-			return true
 		}
 		}
-
 	}
 	}
 
 
 	return false
 	return false