Browse Source

Merge pull request #276 from xiangli-cmu/feat-kvpair-ttl

feat kvpair include ttl
Xiang Li 12 years ago
parent
commit
0e15962cef
5 changed files with 46 additions and 26 deletions
  1. 12 2
      server/v2/tests/get_handler_test.go
  2. 10 4
      store/kv_pairs.go
  3. 4 1
      store/node.go
  4. 12 11
      store/store_test.go
  5. 8 8
      tests/server_utils.go

+ 12 - 2
server/v2/tests/get_handler_test.go

@@ -42,6 +42,7 @@ func TestV2GetKeyRecursively(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
 		v := url.Values{}
 		v.Set("value", "XXX")
 		v.Set("value", "XXX")
+		v.Set("ttl", "10")
 		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/x"), v)
 		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/x"), v)
 		tests.ReadBody(resp)
 		tests.ReadBody(resp)
 
 
@@ -60,6 +61,7 @@ func TestV2GetKeyRecursively(t *testing.T) {
 		kv0 := body["kvs"].([]interface{})[0].(map[string]interface{})
 		kv0 := body["kvs"].([]interface{})[0].(map[string]interface{})
 		assert.Equal(t, kv0["key"], "/foo/x", "")
 		assert.Equal(t, kv0["key"], "/foo/x", "")
 		assert.Equal(t, kv0["value"], "XXX", "")
 		assert.Equal(t, kv0["value"], "XXX", "")
+		assert.Equal(t, kv0["ttl"], 10, "")
 
 
 		kv1 := body["kvs"].([]interface{})[1].(map[string]interface{})
 		kv1 := body["kvs"].([]interface{})[1].(map[string]interface{})
 		assert.Equal(t, kv1["key"], "/foo/y", "")
 		assert.Equal(t, kv1["key"], "/foo/y", "")
@@ -105,7 +107,6 @@ func TestV2WatchKey(t *testing.T) {
 	})
 	})
 }
 }
 
 
-
 // Ensures that a watcher can wait for a value to be set after a given index.
 // Ensures that a watcher can wait for a value to be set after a given index.
 //
 //
 //   $ curl localhost:4001/v2/keys/foo/bar?wait=true&waitIndex=4
 //   $ curl localhost:4001/v2/keys/foo/bar?wait=true&waitIndex=4
@@ -115,9 +116,11 @@ func TestV2WatchKey(t *testing.T) {
 func TestV2WatchKeyWithIndex(t *testing.T) {
 func TestV2WatchKeyWithIndex(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 	tests.RunServer(func(s *server.Server) {
 		var body map[string]interface{}
 		var body map[string]interface{}
+		c := make(chan bool)
 		go func() {
 		go func() {
 			resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=5"))
 			resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=5"))
 			body = tests.ReadBodyJSON(resp)
 			body = tests.ReadBodyJSON(resp)
+			c <- true
 		}()
 		}()
 
 
 		// Make sure response didn't fire early.
 		// Make sure response didn't fire early.
@@ -141,6 +144,14 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
 
 
 		// A response should follow from the GET above.
 		// A response should follow from the GET above.
 		time.Sleep(1 * time.Millisecond)
 		time.Sleep(1 * time.Millisecond)
+
+		select {
+		case <-c:
+
+		default:
+			t.Fatal("cannot get watch result")
+		}
+
 		assert.NotNil(t, body, "")
 		assert.NotNil(t, body, "")
 		assert.Equal(t, body["action"], "set", "")
 		assert.Equal(t, body["action"], "set", "")
 		assert.Equal(t, body["key"], "/foo/bar", "")
 		assert.Equal(t, body["key"], "/foo/bar", "")
@@ -149,4 +160,3 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
 		assert.Equal(t, body["term"], 0, "")
 		assert.Equal(t, body["term"], 0, "")
 	})
 	})
 }
 }
-

+ 10 - 4
store/kv_pairs.go

@@ -1,11 +1,17 @@
 package store
 package store
 
 
+import (
+	"time"
+)
+
 // When user list a directory, we add all the node into key-value pair slice
 // When user list a directory, we add all the node into key-value pair slice
 type KeyValuePair struct {
 type KeyValuePair struct {
-	Key     string  `json:"key, omitempty"`
-	Value   string  `json:"value,omitempty"`
-	Dir     bool    `json:"dir,omitempty"`
-	KVPairs kvPairs `json:"kvs,omitempty"`
+	Key        string     `json:"key, omitempty"`
+	Value      string     `json:"value,omitempty"`
+	Dir        bool       `json:"dir,omitempty"`
+	Expiration *time.Time `json:"expiration,omitempty"`
+	TTL        int64      `json:"ttl,omitempty"` // Time to live in second
+	KVPairs    kvPairs    `json:"kvs,omitempty"`
 }
 }
 
 
 type kvPairs []KeyValuePair
 type kvPairs []KeyValuePair

+ 4 - 1
store/node.go

@@ -322,6 +322,7 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 			Key: n.Path,
 			Key: n.Path,
 			Dir: true,
 			Dir: true,
 		}
 		}
+		pair.Expiration, pair.TTL = n.ExpirationAndTTL()
 
 
 		if !recurisive {
 		if !recurisive {
 			return pair
 			return pair
@@ -354,10 +355,12 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 		return pair
 		return pair
 	}
 	}
 
 
-	return KeyValuePair{
+	pair := KeyValuePair{
 		Key:   n.Path,
 		Key:   n.Path,
 		Value: n.Value,
 		Value: n.Value,
 	}
 	}
+	pair.Expiration, pair.TTL = n.ExpirationAndTTL()
+	return pair
 }
 }
 
 
 func (n *Node) UpdateTTL(expireTime time.Time) {
 func (n *Node) UpdateTTL(expireTime time.Time) {

+ 12 - 11
store/store_test.go

@@ -29,6 +29,7 @@ func TestStoreGetDirectory(t *testing.T) {
 	s.Create("/foo/baz", "", false, Permanent, 5, 1)
 	s.Create("/foo/baz", "", false, Permanent, 5, 1)
 	s.Create("/foo/baz/bat", "Y", false, Permanent, 6, 1)
 	s.Create("/foo/baz/bat", "Y", false, Permanent, 6, 1)
 	s.Create("/foo/baz/_hidden", "*", false, Permanent, 7, 1)
 	s.Create("/foo/baz/_hidden", "*", false, Permanent, 7, 1)
+	s.Create("/foo/baz/ttl", "Y", false, time.Now().Add(time.Second*3), 8, 1)
 	e, err := s.Get("/foo", true, false, 8, 1)
 	e, err := s.Get("/foo", true, false, 8, 1)
 	assert.Nil(t, err, "")
 	assert.Nil(t, err, "")
 	assert.Equal(t, e.Action, "get", "")
 	assert.Equal(t, e.Action, "get", "")
@@ -39,10 +40,14 @@ func TestStoreGetDirectory(t *testing.T) {
 	assert.Equal(t, e.KVPairs[0].Dir, false, "")
 	assert.Equal(t, e.KVPairs[0].Dir, false, "")
 	assert.Equal(t, e.KVPairs[1].Key, "/foo/baz", "")
 	assert.Equal(t, e.KVPairs[1].Key, "/foo/baz", "")
 	assert.Equal(t, e.KVPairs[1].Dir, true, "")
 	assert.Equal(t, e.KVPairs[1].Dir, true, "")
-	assert.Equal(t, len(e.KVPairs[1].KVPairs), 1, "")
+	assert.Equal(t, len(e.KVPairs[1].KVPairs), 2, "")
 	assert.Equal(t, e.KVPairs[1].KVPairs[0].Key, "/foo/baz/bat", "")
 	assert.Equal(t, e.KVPairs[1].KVPairs[0].Key, "/foo/baz/bat", "")
 	assert.Equal(t, e.KVPairs[1].KVPairs[0].Value, "Y", "")
 	assert.Equal(t, e.KVPairs[1].KVPairs[0].Value, "Y", "")
 	assert.Equal(t, e.KVPairs[1].KVPairs[0].Dir, false, "")
 	assert.Equal(t, e.KVPairs[1].KVPairs[0].Dir, false, "")
+	assert.Equal(t, e.KVPairs[1].KVPairs[1].Key, "/foo/baz/ttl", "")
+	assert.Equal(t, e.KVPairs[1].KVPairs[1].Value, "Y", "")
+	assert.Equal(t, e.KVPairs[1].KVPairs[1].Dir, false, "")
+	assert.Equal(t, e.KVPairs[1].KVPairs[1].TTL, 3, "")
 }
 }
 
 
 // Ensure that the store can retrieve a directory in sorted order.
 // Ensure that the store can retrieve a directory in sorted order.
@@ -63,7 +68,6 @@ func TestStoreGetSorted(t *testing.T) {
 	assert.Equal(t, e.KVPairs[2].Key, "/foo/z", "")
 	assert.Equal(t, e.KVPairs[2].Key, "/foo/z", "")
 }
 }
 
 
-
 // Ensure that the store can create a new key if it doesn't already exist.
 // Ensure that the store can create a new key if it doesn't already exist.
 func TestStoreCreateValue(t *testing.T) {
 func TestStoreCreateValue(t *testing.T) {
 	s := newStore()
 	s := newStore()
@@ -139,10 +143,10 @@ func TestStoreUpdateFailsIfDirectory(t *testing.T) {
 func TestStoreUpdateValueTTL(t *testing.T) {
 func TestStoreUpdateValueTTL(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo", "bar", false, Permanent, 2, 1)
 	s.Create("/foo", "bar", false, Permanent, 2, 1)
-	_, err := s.Update("/foo", "baz", time.Now().Add(1 * time.Millisecond), 3, 1)
+	_, err := s.Update("/foo", "baz", time.Now().Add(1*time.Millisecond), 3, 1)
 	e, _ := s.Get("/foo", false, false, 3, 1)
 	e, _ := s.Get("/foo", false, false, 3, 1)
 	assert.Equal(t, e.Value, "baz", "")
 	assert.Equal(t, e.Value, "baz", "")
-	
+
 	time.Sleep(2 * time.Millisecond)
 	time.Sleep(2 * time.Millisecond)
 	e, err = s.Get("/foo", false, false, 3, 1)
 	e, err = s.Get("/foo", false, false, 3, 1)
 	assert.Nil(t, e, "")
 	assert.Nil(t, e, "")
@@ -154,10 +158,10 @@ func TestStoreUpdateDirTTL(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo", "", false, Permanent, 2, 1)
 	s.Create("/foo", "", false, Permanent, 2, 1)
 	s.Create("/foo/bar", "baz", false, Permanent, 3, 1)
 	s.Create("/foo/bar", "baz", false, Permanent, 3, 1)
-	_, err := s.Update("/foo", "", time.Now().Add(1 * time.Millisecond), 3, 1)
+	_, err := s.Update("/foo", "", time.Now().Add(1*time.Millisecond), 3, 1)
 	e, _ := s.Get("/foo/bar", false, false, 3, 1)
 	e, _ := s.Get("/foo/bar", false, false, 3, 1)
 	assert.Equal(t, e.Value, "baz", "")
 	assert.Equal(t, e.Value, "baz", "")
-	
+
 	time.Sleep(2 * time.Millisecond)
 	time.Sleep(2 * time.Millisecond)
 	e, err = s.Get("/foo/bar", false, false, 3, 1)
 	e, err = s.Get("/foo/bar", false, false, 3, 1)
 	assert.Nil(t, e, "")
 	assert.Nil(t, e, "")
@@ -193,7 +197,6 @@ func TestStoreDeleteDiretoryFailsIfNonRecursive(t *testing.T) {
 	assert.Nil(t, e, "")
 	assert.Nil(t, e, "")
 }
 }
 
 
-
 // Ensure that the store can conditionally update a key if it has a previous value.
 // Ensure that the store can conditionally update a key if it has a previous value.
 func TestStoreCompareAndSwapPrevValue(t *testing.T) {
 func TestStoreCompareAndSwapPrevValue(t *testing.T) {
 	s := newStore()
 	s := newStore()
@@ -337,7 +340,7 @@ func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
 // Ensure that the store can watch for key expiration.
 // Ensure that the store can watch for key expiration.
 func TestStoreWatchExpire(t *testing.T) {
 func TestStoreWatchExpire(t *testing.T) {
 	s := newStore()
 	s := newStore()
-	s.Create("/foo", "bar", false, time.Now().Add(1 * time.Millisecond), 2, 1)
+	s.Create("/foo", "bar", false, time.Now().Add(1*time.Millisecond), 2, 1)
 	c, _ := s.Watch("/foo", false, 0, 0, 1)
 	c, _ := s.Watch("/foo", false, 0, 0, 1)
 	e := nbselect(c)
 	e := nbselect(c)
 	assert.Nil(t, e, "")
 	assert.Nil(t, e, "")
@@ -372,7 +375,7 @@ func TestStoreRecoverWithExpiration(t *testing.T) {
 	s := newStore()
 	s := newStore()
 	s.Create("/foo", "", false, Permanent, 2, 1)
 	s.Create("/foo", "", false, Permanent, 2, 1)
 	s.Create("/foo/x", "bar", false, Permanent, 3, 1)
 	s.Create("/foo/x", "bar", false, Permanent, 3, 1)
-	s.Create("/foo/y", "baz", false, time.Now().Add(5 * time.Millisecond), 4, 1)
+	s.Create("/foo/y", "baz", false, time.Now().Add(5*time.Millisecond), 4, 1)
 	b, err := s.Save()
 	b, err := s.Save()
 
 
 	time.Sleep(10 * time.Millisecond)
 	time.Sleep(10 * time.Millisecond)
@@ -389,8 +392,6 @@ func TestStoreRecoverWithExpiration(t *testing.T) {
 	assert.Nil(t, e, "")
 	assert.Nil(t, e, "")
 }
 }
 
 
-
-
 // Performs a non-blocking select on an event channel.
 // Performs a non-blocking select on an event channel.
 func nbselect(c <-chan *Event) *Event {
 func nbselect(c <-chan *Event) *Event {
 	select {
 	select {

+ 8 - 8
tests/server_utils.go

@@ -5,14 +5,14 @@ import (
 	"os"
 	"os"
 	"time"
 	"time"
 
 
-	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/server"
 	"github.com/coreos/etcd/server"
+	"github.com/coreos/etcd/store"
 )
 )
 
 
 const (
 const (
-	testName = "ETCDTEST"
+	testName      = "ETCDTEST"
 	testClientURL = "localhost:4401"
 	testClientURL = "localhost:4401"
-	testRaftURL = "localhost:7701"
+	testRaftURL   = "localhost:7701"
 )
 )
 
 
 // Starts a server in a temporary directory.
 // Starts a server in a temporary directory.
@@ -22,8 +22,8 @@ func RunServer(f func(*server.Server)) {
 
 
 	store := store.New()
 	store := store.New()
 	registry := server.NewRegistry(store)
 	registry := server.NewRegistry(store)
-	ps := server.NewPeerServer(testName, path, testRaftURL, testRaftURL, &server.TLSConfig{Scheme:"http"}, &server.TLSInfo{}, registry, store)
-	s := server.New(testName, testClientURL, testClientURL, &server.TLSConfig{Scheme:"http"}, &server.TLSInfo{}, ps, registry, store)
+	ps := server.NewPeerServer(testName, path, testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store)
+	s := server.New(testName, testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
 	ps.SetServer(s)
 	ps.SetServer(s)
 
 
 	// Start up peer server.
 	// Start up peer server.
@@ -32,17 +32,17 @@ func RunServer(f func(*server.Server)) {
 		c <- true
 		c <- true
 		ps.ListenAndServe(false, []string{})
 		ps.ListenAndServe(false, []string{})
 	}()
 	}()
-	<- c
+	<-c
 
 
 	// Start up etcd server.
 	// Start up etcd server.
 	go func() {
 	go func() {
 		c <- true
 		c <- true
 		s.ListenAndServe()
 		s.ListenAndServe()
 	}()
 	}()
-	<- c
+	<-c
 
 
 	// Wait to make sure servers have started.
 	// Wait to make sure servers have started.
-	time.Sleep(5 * time.Millisecond)
+	time.Sleep(50 * time.Millisecond)
 
 
 	// Execute the function passed in.
 	// Execute the function passed in.
 	f(s)
 	f(s)