Browse Source

feat(store) create node with incremental suffix. accept #190 in new API

Xiang Li 12 years ago
parent
commit
b8b81d5b03
5 changed files with 58 additions and 43 deletions
  1. 6 5
      command.go
  2. 9 2
      etcd_handlers.go
  3. 2 2
      store/stats_test.go
  4. 11 4
      store/store.go
  5. 30 30
      store/store_test.go

+ 6 - 5
command.go

@@ -27,9 +27,10 @@ type Command interface {
 
 // Create command
 type CreateCommand struct {
-	Key        string    `json:"key"`
-	Value      string    `json:"value"`
-	ExpireTime time.Time `json:"expireTime"`
+	Key               string    `json:"key"`
+	Value             string    `json:"value"`
+	ExpireTime        time.Time `json:"expireTime"`
+	IncrementalSuffix bool      `json:"incrementalSuffix"`
 }
 
 // The name of the create command in the log
@@ -39,7 +40,7 @@ func (c *CreateCommand) CommandName() string {
 
 // Create node
 func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) {
-	e, err := etcdStore.Create(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+	e, err := etcdStore.Create(c.Key, c.Value, c.IncrementalSuffix, c.ExpireTime, server.CommitIndex(), server.Term())
 
 	if err != nil {
 		debug(err)
@@ -221,7 +222,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	// add machine in etcd storage
 	key := path.Join("_etcd/machines", c.Name)
 	value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
-	etcdStore.Create(key, value, store.Permanent, raftServer.CommitIndex(), raftServer.Term())
+	etcdStore.Create(key, value, false, store.Permanent, raftServer.CommitIndex(), raftServer.Term())
 
 	// add peer stats
 	if c.Name != r.Name() {

+ 9 - 2
etcd_handlers.go

@@ -75,6 +75,7 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error {
 		w.WriteHeader(http.StatusMethodNotAllowed)
 		return nil
 	}
+
 	return nil
 }
 
@@ -102,6 +103,10 @@ func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 		ExpireTime: expireTime,
 	}
 
+	if req.FormValue("incremental") == "true" {
+		command.IncrementalSuffix = true
+	}
+
 	return dispatchEtcdCommand(command, w, req)
 
 }
@@ -201,6 +206,7 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
 		w.WriteHeader(http.StatusOK)
 		raftURL, _ := nameToRaftURL(leader)
 		w.Write([]byte(raftURL))
+
 		return nil
 	} else {
 		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "")
@@ -213,6 +219,7 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(strings.Join(machines, ", ")))
+
 	return nil
 }
 
@@ -220,6 +227,7 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
 func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	w.WriteHeader(http.StatusOK)
 	fmt.Fprintf(w, "etcd %s", releaseVersion)
+
 	return nil
 }
 
@@ -277,7 +285,6 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 		}
 
 		indexStr := req.FormValue("wait_index")
-
 		if indexStr != "" {
 			sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
 
@@ -297,7 +304,6 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 		}
 
 		sorted := req.FormValue("sorted")
-
 		if sorted == "true" {
 			command.Sorted = true
 		}
@@ -330,6 +336,7 @@ func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
 		directSet()
 		w.WriteHeader(http.StatusOK)
 		w.Write([]byte("speed test success"))
+
 		return
 	}
 

+ 2 - 2
store/stats_test.go

@@ -16,7 +16,7 @@ func TestBasicStats(t *testing.T) {
 
 	for _, k := range keys {
 		i++
-		_, err := s.Create(k, "bar", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
+		_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
 		if err != nil {
 			SetFail++
 		} else {
@@ -146,7 +146,7 @@ func TestBasicStats(t *testing.T) {
 
 	for _, k := range keys {
 		i++
-		_, err := s.Create(k, "bar", time.Now().Add(time.Second*3), i, 1)
+		_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*3), i, 1)
 		if err != nil {
 			SetFail++
 		} else {

+ 11 - 4
store/store.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"path"
 	"sort"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -94,10 +95,12 @@ func (s *Store) Get(nodePath string, recursive, sorted bool, index uint64, term
 // Create function creates the Node at nodePath. Create will help to create intermediate directories with no ttl.
 // If the node has already existed, create will fail.
 // If any node on the path is a file, create will fail.
-func (s *Store) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+func (s *Store) Create(nodePath string, value string, incrementalSuffix bool,
+	expireTime time.Time, index uint64, term uint64) (*Event, error) {
+
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
-	return s.internalCreate(nodePath, value, expireTime, index, term, Create)
+	return s.internalCreate(nodePath, value, incrementalSuffix, expireTime, index, term, Create)
 }
 
 // Update function updates the value/ttl of the node.
@@ -155,7 +158,7 @@ func (s *Store) TestAndSet(nodePath string, prevValue string, prevIndex uint64,
 	defer s.worldLock.Unlock()
 
 	if prevValue == "" && prevIndex == 0 { // try create
-		return s.internalCreate(nodePath, value, expireTime, index, term, TestAndSet)
+		return s.internalCreate(nodePath, value, false, expireTime, index, term, TestAndSet)
 	}
 
 	n, err := s.internalGet(nodePath, index, term)
@@ -262,7 +265,11 @@ func (s *Store) walk(nodePath string, walkFunc func(prev *Node, component string
 	return curr, nil
 }
 
-func (s *Store) internalCreate(nodePath string, value string, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
+func (s *Store) internalCreate(nodePath string, value string, incrementalSuffix bool, expireTime time.Time, index uint64, term uint64, action string) (*Event, error) {
+	if incrementalSuffix { // append unique incremental suffix to the node path
+		nodePath += "_" + strconv.FormatUint(index, 10)
+	}
+
 	nodePath = path.Clean(path.Join("/", nodePath))
 
 	// make sure we can create the node

+ 30 - 30
store/store_test.go

@@ -10,10 +10,10 @@ import (
 func TestCreateAndGet(t *testing.T) {
 	s := New()
 
-	s.Create("/foobar", "bar", Permanent, 1, 1)
+	s.Create("/foobar", "bar", false, Permanent, 1, 1)
 
 	// already exist, create should fail
-	_, err := s.Create("/foobar", "bar", Permanent, 1, 1)
+	_, err := s.Create("/foobar", "bar", false, Permanent, 1, 1)
 
 	if err == nil {
 		t.Fatal("Create should fail")
@@ -27,14 +27,14 @@ func TestCreateAndGet(t *testing.T) {
 	createAndGet(s, "/foo/foo/bar", t)
 
 	// meet file, create should fail
-	_, err = s.Create("/foo/bar/bar", "bar", Permanent, 2, 1)
+	_, err = s.Create("/foo/bar/bar", "bar", false, Permanent, 2, 1)
 
 	if err == nil {
 		t.Fatal("Create should fail")
 	}
 
 	// create a directory
-	_, err = s.Create("/fooDir", "", Permanent, 3, 1)
+	_, err = s.Create("/fooDir", "", false, Permanent, 3, 1)
 
 	if err != nil {
 		t.Fatal("Cannot create /fooDir")
@@ -47,7 +47,7 @@ func TestCreateAndGet(t *testing.T) {
 	}
 
 	// create a file under directory
-	_, err = s.Create("/fooDir/bar", "bar", Permanent, 4, 1)
+	_, err = s.Create("/fooDir/bar", "bar", false, Permanent, 4, 1)
 
 	if err != nil {
 		t.Fatal("Cannot create /fooDir/bar = bar")
@@ -57,7 +57,7 @@ func TestCreateAndGet(t *testing.T) {
 func TestUpdateFile(t *testing.T) {
 	s := New()
 
-	_, err := s.Create("/foo/bar", "bar", Permanent, 1, 1)
+	_, err := s.Create("/foo/bar", "bar", false, Permanent, 1, 1)
 
 	if err != nil {
 		t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error())
@@ -80,24 +80,24 @@ func TestUpdateFile(t *testing.T) {
 	}
 
 	// create a directory, update its ttl, to see if it will be deleted
-	_, err = s.Create("/foo/foo", "", Permanent, 3, 1)
+	_, err = s.Create("/foo/foo", "", false, Permanent, 3, 1)
 
 	if err != nil {
 		t.Fatalf("cannot create dir [%s] [%s]", "/foo/foo", err.Error())
 	}
 
-	_, err = s.Create("/foo/foo/foo1", "bar1", Permanent, 4, 1)
+	_, err = s.Create("/foo/foo/foo1", "bar1", false, Permanent, 4, 1)
 
 	if err != nil {
 		t.Fatal("cannot create [%s]", err.Error())
 	}
 
-	_, err = s.Create("/foo/foo/foo2", "", Permanent, 5, 1)
+	_, err = s.Create("/foo/foo/foo2", "", false, Permanent, 5, 1)
 	if err != nil {
 		t.Fatal("cannot create [%s]", err.Error())
 	}
 
-	_, err = s.Create("/foo/foo/foo2/boo", "boo1", Permanent, 6, 1)
+	_, err = s.Create("/foo/foo/foo2/boo", "boo1", false, Permanent, 6, 1)
 	if err != nil {
 		t.Fatal("cannot create [%s]", err.Error())
 	}
@@ -158,11 +158,11 @@ func TestListDirectory(t *testing.T) {
 
 	// create dir /foo
 	// set key-value /foo/foo=bar
-	s.Create("/foo/foo", "bar", Permanent, 1, 1)
+	s.Create("/foo/foo", "bar", false, Permanent, 1, 1)
 
 	// create dir /foo/fooDir
 	// set key-value /foo/fooDir/foo=bar
-	s.Create("/foo/fooDir/foo", "bar", Permanent, 2, 1)
+	s.Create("/foo/fooDir/foo", "bar", false, Permanent, 2, 1)
 
 	e, err := s.Get("/foo", true, false, 2, 1)
 
@@ -189,7 +189,7 @@ func TestListDirectory(t *testing.T) {
 
 	// create dir /foo/_hidden
 	// set key-value /foo/_hidden/foo -> bar
-	s.Create("/foo/_hidden/foo", "bar", Permanent, 3, 1)
+	s.Create("/foo/_hidden/foo", "bar", false, Permanent, 3, 1)
 
 	e, _ = s.Get("/foo", false, false, 2, 1)
 
@@ -201,7 +201,7 @@ func TestListDirectory(t *testing.T) {
 func TestRemove(t *testing.T) {
 	s := New()
 
-	s.Create("/foo", "bar", Permanent, 1, 1)
+	s.Create("/foo", "bar", false, Permanent, 1, 1)
 	_, err := s.Delete("/foo", false, 1, 1)
 
 	if err != nil {
@@ -214,9 +214,9 @@ func TestRemove(t *testing.T) {
 		t.Fatalf("can get the node after deletion")
 	}
 
-	s.Create("/foo/bar", "bar", Permanent, 1, 1)
-	s.Create("/foo/car", "car", Permanent, 1, 1)
-	s.Create("/foo/dar/dar", "dar", Permanent, 1, 1)
+	s.Create("/foo/bar", "bar", false, Permanent, 1, 1)
+	s.Create("/foo/car", "car", false, Permanent, 1, 1)
+	s.Create("/foo/dar/dar", "dar", false, Permanent, 1, 1)
 
 	_, err = s.Delete("/foo", false, 1, 1)
 
@@ -242,7 +242,7 @@ func TestExpire(t *testing.T) {
 
 	expire := time.Now().Add(time.Second)
 
-	s.Create("/foo", "bar", expire, 1, 1)
+	s.Create("/foo", "bar", false, expire, 1, 1)
 
 	_, err := s.Get("/foo", false, false, 1, 1)
 
@@ -260,7 +260,7 @@ func TestExpire(t *testing.T) {
 
 	// test if we can reach the node before expiration
 	expire = time.Now().Add(time.Second)
-	s.Create("/foo", "bar", expire, 1, 1)
+	s.Create("/foo", "bar", false, expire, 1, 1)
 
 	time.Sleep(time.Millisecond * 50)
 	_, err = s.Get("/foo", false, false, 1, 1)
@@ -271,7 +271,7 @@ func TestExpire(t *testing.T) {
 
 	expire = time.Now().Add(time.Second)
 
-	s.Create("/foo", "bar", expire, 1, 1)
+	s.Create("/foo", "bar", false, expire, 1, 1)
 	_, err = s.Delete("/foo", false, 1, 1)
 
 	if err != nil {
@@ -281,7 +281,7 @@ func TestExpire(t *testing.T) {
 
 func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
 	s := New()
-	s.Create("/foo", "bar", Permanent, 1, 1)
+	s.Create("/foo", "bar", false, Permanent, 1, 1)
 
 	// test on wrong previous value
 	_, err := s.TestAndSet("/foo", "barbar", 0, "car", Permanent, 2, 1)
@@ -316,7 +316,7 @@ func TestWatch(t *testing.T) {
 	s := New()
 	// watch at a deeper path
 	c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1)
-	s.Create("/foo/foo/foo", "bar", Permanent, 1, 1)
+	s.Create("/foo/foo/foo", "bar", false, Permanent, 1, 1)
 
 	e := nonblockingRetrive(c)
 	if e.Key != "/foo/foo/foo" || e.Action != Create {
@@ -346,7 +346,7 @@ func TestWatch(t *testing.T) {
 
 	// watch at a prefix
 	c, _ = s.Watch("/foo", true, 0, 4, 1)
-	s.Create("/foo/foo/boo", "bar", Permanent, 5, 1)
+	s.Create("/foo/foo/boo", "bar", false, Permanent, 5, 1)
 	e = nonblockingRetrive(c)
 	if e.Key != "/foo/foo/boo" || e.Action != Create {
 		t.Fatal("watch for Create subdirectory fails")
@@ -374,7 +374,7 @@ func TestWatch(t *testing.T) {
 	}
 
 	// watch expire
-	s.Create("/foo/foo/boo", "foo", time.Now().Add(time.Second*1), 9, 1)
+	s.Create("/foo/foo/boo", "foo", false, time.Now().Add(time.Second*1), 9, 1)
 	c, _ = s.Watch("/foo", true, 0, 9, 1)
 	time.Sleep(time.Second * 2)
 	e = nonblockingRetrive(c)
@@ -382,7 +382,7 @@ func TestWatch(t *testing.T) {
 		t.Fatal("watch for Expiration of Create() subdirectory fails ", e)
 	}
 
-	s.Create("/foo/foo/boo", "foo", Permanent, 10, 1)
+	s.Create("/foo/foo/boo", "foo", false, Permanent, 10, 1)
 	s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1)
 	c, _ = s.Watch("/foo", true, 0, 11, 1)
 	time.Sleep(time.Second * 2)
@@ -391,7 +391,7 @@ func TestWatch(t *testing.T) {
 		t.Fatal("watch for Expiration of Update() subdirectory fails ", e)
 	}
 
-	s.Create("/foo/foo/boo", "foo", Permanent, 12, 1)
+	s.Create("/foo/foo/boo", "foo", false, Permanent, 12, 1)
 	s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1)
 	c, _ = s.Watch("/foo", true, 0, 13, 1)
 	time.Sleep(time.Second * 2)
@@ -409,7 +409,7 @@ func TestSort(t *testing.T) {
 
 	i := uint64(1)
 	for _, k := range keys {
-		_, err := s.Create(k, "bar", Permanent, i, 1)
+		_, err := s.Create(k, "bar", false, Permanent, i, 1)
 		if err != nil {
 			panic(err)
 		} else {
@@ -447,7 +447,7 @@ func TestSaveAndRecover(t *testing.T) {
 
 	i := uint64(1)
 	for _, k := range keys {
-		_, err := s.Create(k, "bar", Permanent, i, 1)
+		_, err := s.Create(k, "bar", false, Permanent, i, 1)
 		if err != nil {
 			panic(err)
 		} else {
@@ -459,7 +459,7 @@ func TestSaveAndRecover(t *testing.T) {
 	// test if we can reach the node before expiration
 
 	expire := time.Now().Add(time.Second)
-	s.Create("/foo/foo", "bar", expire, 1, 1)
+	s.Create("/foo/foo", "bar", false, expire, 1, 1)
 	b, err := s.Save()
 
 	cloneFs := New()
@@ -514,7 +514,7 @@ func GenKeys(num int, depth int) []string {
 }
 
 func createAndGet(s *Store, path string, t *testing.T) {
-	_, err := s.Create(path, "bar", Permanent, 1, 1)
+	_, err := s.Create(path, "bar", false, Permanent, 1, 1)
 
 	if err != nil {
 		t.Fatalf("cannot create %s=bar [%s]", path, err.Error())