Browse Source

refactor put_handler.go

Xiang Li 12 years ago
parent
commit
0392c18794
6 changed files with 135 additions and 57 deletions
  1. 35 12
      server/v2/put_handler.go
  2. 23 13
      store/stats.go
  3. 13 13
      store/stats_test.go
  4. 22 14
      store/store.go
  5. 5 5
      store/store_test.go
  6. 37 0
      store/update_command.go

+ 35 - 12
server/v2/put_handler.go

@@ -3,6 +3,7 @@ package v2
 import (
 import (
 	"net/http"
 	"net/http"
 	"strconv"
 	"strconv"
+	"time"
 
 
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
@@ -33,14 +34,9 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 
 
 	var c raft.Command
 	var c raft.Command
 
 
-	// Set command: create a new node or replace the old one.
+	// Set handler: create a new node or replace the old one.
 	if !valueOk && !indexOk && !existOk {
 	if !valueOk && !indexOk && !existOk {
-		c = &store.SetCommand{
-			Key:        key,
-			Value:      value,
-			ExpireTime: expireTime,
-		}
-		return s.Dispatch(c, w, req)
+		return SetHandler(w, req, s, key, value, expireTime)
 	}
 	}
 
 
 	// update with test
 	// update with test
@@ -48,11 +44,11 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 		if prevExist[0] == "false" {
 		if prevExist[0] == "false" {
 			// Create command: create a new node. Fail, if a node already exists
 			// Create command: create a new node. Fail, if a node already exists
 			// Ignore prevIndex and prevValue
 			// Ignore prevIndex and prevValue
-			c = &store.CreateCommand{
-				Key:        key,
-				Value:      value,
-				ExpireTime: expireTime,
-			}
+			return CreateHandler(w, req, s, key, value, expireTime)
+		}
+
+		if prevExist[0] == "true" && !indexOk && !valueOk {
+			return UpdateHandler(w, req, s, key, value, expireTime)
 		}
 		}
 	}
 	}
 
 
@@ -84,3 +80,30 @@ func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 
 
 	return s.Dispatch(c, w, req)
 	return s.Dispatch(c, w, req)
 }
 }
+
+func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
+	c := &store.SetCommand{
+		Key:        key,
+		Value:      value,
+		ExpireTime: expireTime,
+	}
+	return s.Dispatch(c, w, req)
+}
+
+func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
+	c := &store.CreateCommand{
+		Key:        key,
+		Value:      value,
+		ExpireTime: expireTime,
+	}
+	return s.Dispatch(c, w, req)
+}
+
+func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
+	c := &store.UpdateCommand{
+		Key:        key,
+		Value:      value,
+		ExpireTime: expireTime,
+	}
+	return s.Dispatch(c, w, req)
+}

+ 23 - 13
store/stats.go

@@ -6,17 +6,19 @@ import (
 )
 )
 
 
 const (
 const (
-	SetSuccess            = 100
-	SetFail               = 101
-	DeleteSuccess         = 102
-	DeleteFail            = 103
-	UpdateSuccess         = 104
-	UpdateFail            = 105
-	CompareAndSwapSuccess = 106
-	CompareAndSwapFail    = 107
-	GetSuccess            = 110
-	GetFail               = 111
-	ExpireCount           = 112
+	SetSuccess = iota
+	SetFail
+	DeleteSuccess
+	DeleteFail
+	CreateSuccess
+	CreateFail
+	UpdateSuccess
+	UpdateFail
+	CompareAndSwapSuccess
+	CompareAndSwapFail
+	GetSuccess
+	GetFail
+	ExpireCount
 )
 )
 
 
 type Stats struct {
 type Stats struct {
@@ -37,6 +39,10 @@ type Stats struct {
 	UpdateSuccess uint64 `json:"updateSuccess"`
 	UpdateSuccess uint64 `json:"updateSuccess"`
 	UpdateFail    uint64 `json:"updateFail"`
 	UpdateFail    uint64 `json:"updateFail"`
 
 
+	// Number of create requests
+	CreateSuccess uint64 `json:"createSuccess"`
+	CreateFail    uint64 `json:createFail`
+
 	// Number of testAndSet requests
 	// Number of testAndSet requests
 	CompareAndSwapSuccess uint64 `json:"compareAndSwapSuccess"`
 	CompareAndSwapSuccess uint64 `json:"compareAndSwapSuccess"`
 	CompareAndSwapFail    uint64 `json:"compareAndSwapFail"`
 	CompareAndSwapFail    uint64 `json:"compareAndSwapFail"`
@@ -53,8 +59,8 @@ func newStats() *Stats {
 
 
 func (s *Stats) clone() *Stats {
 func (s *Stats) clone() *Stats {
 	return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail,
 	return &Stats{s.GetSuccess, s.GetFail, s.SetSuccess, s.SetFail,
-		s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail,
-		s.CompareAndSwapSuccess, s.CompareAndSwapFail, s.Watchers, s.ExpireCount}
+		s.DeleteSuccess, s.DeleteFail, s.UpdateSuccess, s.UpdateFail, s.CreateSuccess,
+		s.CreateFail, s.CompareAndSwapSuccess, s.CompareAndSwapFail, s.Watchers, s.ExpireCount}
 }
 }
 
 
 // Status() return the statistics info of etcd storage its recent start
 // Status() return the statistics info of etcd storage its recent start
@@ -80,6 +86,10 @@ func (s *Stats) Inc(field int) {
 		atomic.AddUint64(&s.SetSuccess, 1)
 		atomic.AddUint64(&s.SetSuccess, 1)
 	case SetFail:
 	case SetFail:
 		atomic.AddUint64(&s.SetFail, 1)
 		atomic.AddUint64(&s.SetFail, 1)
+	case CreateSuccess:
+		atomic.AddUint64(&s.CreateSuccess, 1)
+	case CreateFail:
+		atomic.AddUint64(&s.CreateFail, 1)
 	case DeleteSuccess:
 	case DeleteSuccess:
 		atomic.AddUint64(&s.DeleteSuccess, 1)
 		atomic.AddUint64(&s.DeleteSuccess, 1)
 	case DeleteFail:
 	case DeleteFail:

+ 13 - 13
store/stats_test.go

@@ -11,16 +11,16 @@ func TestBasicStats(t *testing.T) {
 	keys := GenKeys(rand.Intn(100), 5)
 	keys := GenKeys(rand.Intn(100), 5)
 
 
 	var i uint64
 	var i uint64
-	var GetSuccess, GetFail, SetSuccess, SetFail, DeleteSuccess, DeleteFail uint64
+	var GetSuccess, GetFail, CreateSuccess, CreateFail, DeleteSuccess, DeleteFail uint64
 	var UpdateSuccess, UpdateFail, CompareAndSwapSuccess, CompareAndSwapFail, watcher_number uint64
 	var UpdateSuccess, UpdateFail, CompareAndSwapSuccess, CompareAndSwapFail, watcher_number uint64
 
 
 	for _, k := range keys {
 	for _, k := range keys {
 		i++
 		i++
 		_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
 		_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
 		if err != nil {
 		if err != nil {
-			SetFail++
+			CreateFail++
 		} else {
 		} else {
-			SetSuccess++
+			CreateSuccess++
 		}
 		}
 	}
 	}
 
 
@@ -37,7 +37,7 @@ func TestBasicStats(t *testing.T) {
 
 
 	for _, k := range keys {
 	for _, k := range keys {
 		i++
 		i++
-		_, err := s.update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
+		_, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
 		if err != nil {
 		if err != nil {
 			UpdateFail++
 			UpdateFail++
 		} else {
 		} else {
@@ -108,12 +108,12 @@ func TestBasicStats(t *testing.T) {
 		t.Fatalf("GetFail [%d] != Stats.GetFail [%d]", GetFail, s.Stats.GetFail)
 		t.Fatalf("GetFail [%d] != Stats.GetFail [%d]", GetFail, s.Stats.GetFail)
 	}
 	}
 
 
-	if SetSuccess != s.Stats.SetSuccess {
-		t.Fatalf("SetSuccess [%d] != Stats.SetSuccess [%d]", SetSuccess, s.Stats.SetSuccess)
+	if CreateSuccess != s.Stats.CreateSuccess {
+		t.Fatalf("CreateSuccess [%d] != Stats.CreateSuccess [%d]", CreateSuccess, s.Stats.CreateSuccess)
 	}
 	}
 
 
-	if SetFail != s.Stats.SetFail {
-		t.Fatalf("SetFail [%d] != Stats.SetFail [%d]", SetFail, s.Stats.SetFail)
+	if CreateFail != s.Stats.CreateFail {
+		t.Fatalf("CreateFail [%d] != Stats.CreateFail [%d]", CreateFail, s.Stats.CreateFail)
 	}
 	}
 
 
 	if DeleteSuccess != s.Stats.DeleteSuccess {
 	if DeleteSuccess != s.Stats.DeleteSuccess {
@@ -141,22 +141,22 @@ func TestBasicStats(t *testing.T) {
 	}
 	}
 
 
 	s = newStore()
 	s = newStore()
-	SetSuccess = 0
-	SetFail = 0
+	CreateSuccess = 0
+	CreateFail = 0
 
 
 	for _, k := range keys {
 	for _, k := range keys {
 		i++
 		i++
 		_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*3), i, 1)
 		_, err := s.Create(k, "bar", false, time.Now().Add(time.Second*3), i, 1)
 		if err != nil {
 		if err != nil {
-			SetFail++
+			CreateFail++
 		} else {
 		} else {
-			SetSuccess++
+			CreateSuccess++
 		}
 		}
 	}
 	}
 
 
 	time.Sleep(6 * time.Second)
 	time.Sleep(6 * time.Second)
 
 
-	ExpireCount := SetSuccess
+	ExpireCount := CreateSuccess
 
 
 	if ExpireCount != s.Stats.ExpireCount {
 	if ExpireCount != s.Stats.ExpireCount {
 		t.Fatalf("ExpireCount [%d] != Stats.ExpireCount [%d]", ExpireCount, s.Stats.ExpireCount)
 		t.Fatalf("ExpireCount [%d] != Stats.ExpireCount [%d]", ExpireCount, s.Stats.ExpireCount)

+ 22 - 14
store/store.go

@@ -16,6 +16,7 @@ import (
 type Store interface {
 type Store interface {
 	Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error)
 	Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error)
 	Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
 	Set(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error)
+	Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error)
 	Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time,
 	Create(nodePath string, value string, incrementalSuffix bool, expireTime time.Time,
 		index uint64, term uint64) (*Event, error)
 		index uint64, term uint64) (*Event, error)
 	CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
 	CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
@@ -113,7 +114,15 @@ func (s *store) Create(nodePath string, value string, unique bool,
 
 
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
-	return s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create)
+	e, err := s.internalCreate(nodePath, value, unique, false, expireTime, index, term, Create)
+
+	if err == nil {
+		s.Stats.Inc(CreateSuccess)
+	} else {
+		s.Stats.Inc(CreateFail)
+	}
+
+	return e, err
 }
 }
 
 
 // Set function creates or replace the Node at nodePath.
 // Set function creates or replace the Node at nodePath.
@@ -122,7 +131,15 @@ func (s *store) Set(nodePath string, value string, expireTime time.Time, index u
 
 
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
-	return s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set)
+	e, err := s.internalCreate(nodePath, value, false, true, expireTime, index, term, Set)
+
+	if err == nil {
+		s.Stats.Inc(SetSuccess)
+	} else {
+		s.Stats.Inc(SetFail)
+	}
+
+	return e, err
 }
 }
 
 
 func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
 func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
@@ -133,10 +150,6 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
 
 
-	if prevValue == "" && prevIndex == 0 { // try just update
-		return s.update(nodePath, value, expireTime, index, term)
-	}
-
 	n, err := s.internalGet(nodePath, index, term)
 	n, err := s.internalGet(nodePath, index, term)
 
 
 	if err != nil {
 	if err != nil {
@@ -265,7 +278,7 @@ func (s *store) walk(nodePath string, walkFunc func(prev *Node, component string
 // Update function updates the value/ttl of the node.
 // Update function updates the value/ttl of the node.
 // If the node is a file, the value and the ttl can be updated.
 // If the node is a file, the value and the ttl can be updated.
 // If the node is a directory, only the ttl can be updated.
 // If the node is a directory, only the ttl can be updated.
-func (s *store) update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+func (s *store) Update(nodePath string, newValue string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
 	s.worldLock.Lock()
 	s.worldLock.Lock()
 	defer s.worldLock.Unlock()
 	defer s.worldLock.Unlock()
 	nodePath = path.Clean(path.Join("/", nodePath))
 	nodePath = path.Clean(path.Join("/", nodePath))
@@ -354,12 +367,8 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 
 
 	}
 	}
 
 
-	err = d.Add(n)
-
-	if err != nil {
-		s.Stats.Inc(SetFail)
-		return nil, err
-	}
+	// we are sure d is a directory and does not have the children with name n.Name
+	d.Add(n)
 
 
 	// Node with TTL
 	// Node with TTL
 	if expireTime.Sub(Permanent) != 0 {
 	if expireTime.Sub(Permanent) != 0 {
@@ -368,7 +377,6 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
 	}
 	}
 
 
 	s.WatcherHub.notify(e)
 	s.WatcherHub.notify(e)
-	s.Stats.Inc(SetSuccess)
 	return e, nil
 	return e, nil
 }
 }
 
 

+ 5 - 5
store/store_test.go

@@ -74,7 +74,7 @@ func TestUpdateFile(t *testing.T) {
 		t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error())
 		t.Fatalf("cannot create %s=bar [%s]", "/foo/bar", err.Error())
 	}
 	}
 
 
-	_, err = s.update("/foo/bar", "barbar", Permanent, 2, 1)
+	_, err = s.Update("/foo/bar", "barbar", Permanent, 2, 1)
 
 
 	if err != nil {
 	if err != nil {
 		t.Fatalf("cannot update %s=barbar [%s]", "/foo/bar", err.Error())
 		t.Fatalf("cannot update %s=barbar [%s]", "/foo/bar", err.Error())
@@ -114,7 +114,7 @@ func TestUpdateFile(t *testing.T) {
 	}
 	}
 
 
 	expire := time.Now().Add(time.Second * 2)
 	expire := time.Now().Add(time.Second * 2)
-	_, err = s.update("/foo/foo", "", expire, 7, 1)
+	_, err = s.Update("/foo/foo", "", expire, 7, 1)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("cannot update dir [%s] [%s]", "/foo/foo", err.Error())
 		t.Fatalf("cannot update dir [%s] [%s]", "/foo/foo", err.Error())
 	}
 	}
@@ -331,7 +331,7 @@ func TestWatch(t *testing.T) {
 	}
 	}
 
 
 	c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1)
 	c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1)
-	s.update("/foo/foo/foo", "car", Permanent, 2, 1)
+	s.Update("/foo/foo/foo", "car", Permanent, 2, 1)
 	e = nonblockingRetrive(c)
 	e = nonblockingRetrive(c)
 	if e.Key != "/foo/foo/foo" || e.Action != Update {
 	if e.Key != "/foo/foo/foo" || e.Action != Update {
 		t.Fatal("watch for Update node fails ", e)
 		t.Fatal("watch for Update node fails ", e)
@@ -360,7 +360,7 @@ func TestWatch(t *testing.T) {
 	}
 	}
 
 
 	c, _ = s.Watch("/foo", true, 0, 5, 1)
 	c, _ = s.Watch("/foo", true, 0, 5, 1)
-	s.update("/foo/foo/boo", "foo", Permanent, 6, 1)
+	s.Update("/foo/foo/boo", "foo", Permanent, 6, 1)
 	e = nonblockingRetrive(c)
 	e = nonblockingRetrive(c)
 	if e.Key != "/foo/foo/boo" || e.Action != Update {
 	if e.Key != "/foo/foo/boo" || e.Action != Update {
 		t.Fatal("watch for Update subdirectory fails")
 		t.Fatal("watch for Update subdirectory fails")
@@ -390,7 +390,7 @@ func TestWatch(t *testing.T) {
 	}
 	}
 
 
 	s.Create("/foo/foo/boo", "foo", false, Permanent, 10, 1)
 	s.Create("/foo/foo/boo", "foo", false, Permanent, 10, 1)
-	s.update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1)
+	s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1)
 	c, _ = s.Watch("/foo", true, 0, 11, 1)
 	c, _ = s.Watch("/foo", true, 0, 11, 1)
 	time.Sleep(time.Second * 2)
 	time.Sleep(time.Second * 2)
 	e = nonblockingRetrive(c)
 	e = nonblockingRetrive(c)

+ 37 - 0
store/update_command.go

@@ -0,0 +1,37 @@
+package store
+
+import (
+	"github.com/coreos/etcd/log"
+	"github.com/coreos/go-raft"
+	"time"
+)
+
+func init() {
+	raft.RegisterCommand(&UpdateCommand{})
+}
+
+// Update command
+type UpdateCommand struct {
+	Key        string    `json:"key"`
+	Value      string    `json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+}
+
+// The name of the update command in the log
+func (c *UpdateCommand) CommandName() string {
+	return "etcd:update"
+}
+
+// Create node
+func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) {
+	s, _ := server.StateMachine().(Store)
+
+	e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		log.Debug(err)
+		return nil, err
+	}
+
+	return e, nil
+}