Browse Source

use new store system

Xiang Li 12 years ago
parent
commit
3ff100321c
9 changed files with 249 additions and 108 deletions
  1. 90 20
      command.go
  2. 7 2
      etcd.go
  3. 107 69
      etcd_handlers.go
  4. 3 2
      etcd_test.go
  5. 24 7
      file_system/file_system.go
  6. 8 4
      file_system/file_system_test.go
  7. 2 1
      raft_handlers.go
  8. 3 0
      raft_server.go
  9. 5 3
      util.go

+ 90 - 20
command.go

@@ -4,12 +4,12 @@ import (
 	"encoding/binary"
 	"encoding/json"
 	"fmt"
-	etcdErr "github.com/coreos/etcd/error"
-	"github.com/coreos/etcd/store"
-	"github.com/coreos/go-raft"
 	"os"
 	"path"
 	"time"
+
+	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/go-raft"
 )
 
 const commandPrefix = "etcd:"
@@ -24,6 +24,54 @@ type Command interface {
 	Apply(server *raft.Server) (interface{}, error)
 }
 
+// Create command
+type CreateCommand struct {
+	Key        string    `json:"key"`
+	Value      string    `json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+}
+
+// The name of the create command in the log
+func (c *CreateCommand) CommandName() string {
+	return commandName("create")
+}
+
+// Create node
+func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) {
+	e, err := etcdFs.Create(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		debug(err)
+		return nil, err
+	}
+
+	return json.Marshal(e)
+}
+
+// Update command
+type UpdateCommand struct {
+	Key        string    `json:"key"`
+	Value      string    `json:"value"`
+	ExpireTime time.Time `json:"expireTime"`
+}
+
+// The name of the update command in the log
+func (c *UpdateCommand) CommandName() string {
+	return commandName("update")
+}
+
+// Update node
+func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) {
+	e, err := etcdFs.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		debug(err)
+		return nil, err
+	}
+
+	return json.Marshal(e)
+}
+
 // Set command
 type SetCommand struct {
 	Key        string    `json:"key"`
@@ -45,8 +93,9 @@ func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
 type TestAndSetCommand struct {
 	Key        string    `json:"key"`
 	Value      string    `json:"value"`
-	PrevValue  string    `json: prevValue`
 	ExpireTime time.Time `json:"expireTime"`
+	PrevValue  string    `json: prevValue`
+	PrevIndex  uint64    `json: prevValue`
 }
 
 // The name of the testAndSet command in the log
@@ -56,12 +105,22 @@ func (c *TestAndSetCommand) CommandName() string {
 
 // Set the key-value pair if the current value of the key equals to the given prevValue
 func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
-	return etcdStore.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex())
+	e, err := etcdFs.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,
+		c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		debug(err)
+		return nil, err
+	}
+
+	return json.Marshal(e)
 }
 
 // Get command
 type GetCommand struct {
-	Key string `json:"key"`
+	Key       string `json:"key"`
+	Recursive bool   `json:"recursive"`
+	Sorted    bool   `json:"sorted"`
 }
 
 // The name of the get command in the log
@@ -71,12 +130,20 @@ func (c *GetCommand) CommandName() string {
 
 // Get the value of key
 func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
-	return etcdStore.Get(c.Key)
+	e, err := etcdFs.Get(c.Key, c.Recursive, c.Sorted, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		debug(err)
+		return nil, err
+	}
+
+	return json.Marshal(e)
 }
 
 // Delete command
 type DeleteCommand struct {
-	Key string `json:"key"`
+	Key       string `json:"key"`
+	Recursive bool   `json:"recursive"`
 }
 
 // The name of the delete command in the log
@@ -86,13 +153,21 @@ func (c *DeleteCommand) CommandName() string {
 
 // Delete the key
 func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
-	return etcdStore.Delete(c.Key, server.CommitIndex())
+	e, err := etcdFs.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())
+
+	if err != nil {
+		debug(err)
+		return nil, err
+	}
+
+	return json.Marshal(e)
 }
 
 // Watch command
 type WatchCommand struct {
 	Key        string `json:"key"`
 	SinceIndex uint64 `json:"sinceIndex"`
+	Recursive  bool   `json:"recursive"`
 }
 
 // The name of the watch command in the log
@@ -101,20 +176,15 @@ func (c *WatchCommand) CommandName() string {
 }
 
 func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
-	// create a new watcher
-	watcher := store.NewWatcher()
-
-	// add to the watchers list
-	etcdStore.AddWatcher(c.Key, watcher, c.SinceIndex)
+	eventChan, err := etcdFs.Watch(c.Key, c.Recursive, c.SinceIndex, server.CommitIndex(), server.Term())
 
-	// wait for the notification for any changing
-	res := <-watcher.C
-
-	if res == nil {
-		return nil, fmt.Errorf("Clearing watch")
+	if err != nil {
+		return nil, err
 	}
 
-	return json.Marshal(res)
+	e := <-eventChan
+
+	return json.Marshal(e)
 }
 
 // JoinCommand

+ 7 - 2
etcd.go

@@ -3,12 +3,14 @@ package main
 import (
 	"crypto/tls"
 	"flag"
-	"github.com/coreos/etcd/store"
-	"github.com/coreos/go-raft"
 	"io/ioutil"
 	"os"
 	"strings"
 	"time"
+
+	"github.com/coreos/etcd/file_system"
+	"github.com/coreos/etcd/store"
+	"github.com/coreos/go-raft"
 )
 
 //------------------------------------------------------------------------------
@@ -129,6 +131,7 @@ type TLSConfig struct {
 //------------------------------------------------------------------------------
 
 var etcdStore *store.Store
+var etcdFs *fileSystem.FileSystem
 
 //------------------------------------------------------------------------------
 //
@@ -195,6 +198,8 @@ func main() {
 
 	// Create etcd key-value store
 	etcdStore = store.CreateStore(maxSize)
+	etcdFs = fileSystem.New()
+
 	snapConf = newSnapshotConf()
 
 	// Create etcd and raft server

+ 107 - 69
etcd_handlers.go

@@ -2,12 +2,12 @@ package main
 
 import (
 	"fmt"
-	etcdErr "github.com/coreos/etcd/error"
-	"github.com/coreos/etcd/store"
-	"github.com/coreos/go-raft"
 	"net/http"
 	"strconv"
 	"strings"
+
+	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/go-raft"
 )
 
 //-------------------------------------------------------------------
@@ -18,7 +18,6 @@ func NewEtcdMuxer() *http.ServeMux {
 	// external commands
 	etcdMux := http.NewServeMux()
 	etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer))
-	etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler))
 	etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
 	etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
 	etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler))
@@ -47,15 +46,16 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error {
 	case "GET":
 		return GetHttpHandler(w, req)
 	case "POST":
-		return SetHttpHandler(w, req)
+		return CreateHttpHandler(w, req)
 	case "PUT":
-		return SetHttpHandler(w, req)
+		return UpdateHttpHandler(w, req)
 	case "DELETE":
 		return DeleteHttpHandler(w, req)
 	default:
 		w.WriteHeader(http.StatusMethodNotAllowed)
 		return nil
 	}
+	return nil
 }
 
 //--------------------------------------
@@ -63,63 +63,102 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error {
 // Set/Delete will dispatch to leader
 //--------------------------------------
 
-// Set Command Handler
-func SetHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := req.URL.Path[len("/v1/keys/"):]
-
-	if store.CheckKeyword(key) {
-		return etcdErr.NewError(etcdErr.EcodeKeyIsPreserved, "Set")
-	}
+func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
+	key := req.URL.Path[len("/v2/keys"):]
 
-	debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
+	debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
 	value := req.FormValue("value")
 
-	if len(value) == 0 {
-		return etcdErr.NewError(etcdErr.EcodeValueRequired, "Set")
+	ttl := req.FormValue("ttl")
+
+	expireTime, err := durationToExpireTime(ttl)
+
+	if err != nil {
+		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create")
 	}
 
-	prevValue := req.FormValue("prevValue")
+	command := &CreateCommand{
+		Key:        key,
+		Value:      value,
+		ExpireTime: expireTime,
+	}
+
+	return dispatch(command, w, req, true)
+
+}
 
-	strDuration := req.FormValue("ttl")
+func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
+	key := req.URL.Path[len("/v2/keys"):]
 
-	expireTime, err := durationToExpireTime(strDuration)
+	debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
+
+	value := req.FormValue("value")
+
+	ttl := req.FormValue("ttl")
+
+	expireTime, err := durationToExpireTime(ttl)
 
 	if err != nil {
-		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Set")
+		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update")
 	}
 
-	if len(prevValue) != 0 {
-		command := &TestAndSetCommand{
+	// TODO: update should give at least one option
+	if value == "" && ttl == "" {
+		return nil
+	}
+
+	prevValue := req.FormValue("prevValue")
+
+	prevIndexStr := req.FormValue("prevIndex")
+
+	if prevValue == "" && prevIndexStr == "" { // update without test
+		command := &UpdateCommand{
 			Key:        key,
 			Value:      value,
-			PrevValue:  prevValue,
 			ExpireTime: expireTime,
 		}
 
 		return dispatch(command, w, req, true)
 
-	} else {
-		command := &SetCommand{
-			Key:        key,
-			Value:      value,
-			ExpireTime: expireTime,
+	} else { // update with test
+		var prevIndex uint64
+
+		if prevIndexStr != "" {
+			prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64)
+		}
+
+		// TODO: add error type
+		if err != nil {
+			return nil
+		}
+
+		command := &TestAndSetCommand{
+			Key:       key,
+			Value:     value,
+			PrevValue: prevValue,
+			PrevIndex: prevIndex,
 		}
 
 		return dispatch(command, w, req, true)
 	}
+
 }
 
 // Delete Handler
 func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := req.URL.Path[len("/v1/keys/"):]
+	key := req.URL.Path[len("/v2/keys"):]
 
-	debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
+	debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
 	command := &DeleteCommand{
 		Key: key,
 	}
 
+	if req.FormValue("recursive") == "true" {
+		command.Recursive = true
+	}
+
 	return dispatch(command, w, req, true)
 }
 
@@ -212,64 +251,63 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	return nil
 }
 
-// Get Handler
 func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := req.URL.Path[len("/v1/keys/"):]
+	var err error
+	var event interface{}
+	key := req.URL.Path[len("/v1/keys"):]
 
-	debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
+	debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
-	command := &GetCommand{
-		Key: key,
-	}
+	recursive := req.FormValue("recursive")
 
-	if body, err := command.Apply(r.Server); err != nil {
-		return err
-	} else {
-		body, _ := body.([]byte)
-		w.WriteHeader(http.StatusOK)
-		w.Write(body)
+	if req.FormValue("wait") == "true" {
+		command := &WatchCommand{
+			Key: key,
+		}
 
-		return nil
-	}
+		if recursive == "true" {
+			command.Recursive = true
+		}
 
-}
+		indexStr := req.FormValue("wait_index")
 
-// Watch handler
-func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := req.URL.Path[len("/v1/watch/"):]
+		if indexStr != "" {
+			sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
 
-	command := &WatchCommand{
-		Key: key,
-	}
+			if err != nil {
+				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index")
+			}
 
-	if req.Method == "GET" {
-		debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
-		command.SinceIndex = 0
+			command.SinceIndex = sinceIndex
+		}
 
-	} else if req.Method == "POST" {
-		// watch from a specific index
+		event, err = command.Apply(r.Server)
 
-		debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
-		content := req.FormValue("index")
+	} else {
+		command := &GetCommand{
+			Key: key,
+		}
 
-		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
-		if err != nil {
-			return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index")
+		sorted := req.FormValue("sorted")
+
+		if sorted == "true" {
+			command.Sorted = true
 		}
-		command.SinceIndex = sinceIndex
 
-	} else {
-		w.WriteHeader(http.StatusMethodNotAllowed)
-		return nil
+		if recursive == "true" {
+			command.Recursive = true
+		}
+
+		event, err = command.Apply(r.Server)
 	}
 
-	if body, err := command.Apply(r.Server); err != nil {
-		return etcdErr.NewError(etcdErr.EcodeWatcherCleared, key)
+	if err != nil {
+		return err
 	} else {
+		event, _ := event.([]byte)
 		w.WriteHeader(http.StatusOK)
+		w.Write(event)
 
-		body, _ := body.([]byte)
-		w.Write(body)
 		return nil
 	}
 

+ 3 - 2
etcd_test.go

@@ -2,8 +2,6 @@ package main
 
 import (
 	"fmt"
-	"github.com/coreos/etcd/test"
-	"github.com/coreos/go-etcd/etcd"
 	"math/rand"
 	"net/http"
 	"net/http/httptest"
@@ -13,6 +11,9 @@ import (
 	"strings"
 	"testing"
 	"time"
+
+	"github.com/coreos/etcd/test"
+	"github.com/coreos/go-etcd/etcd"
 )
 
 // Create a single node and try to set value

+ 24 - 7
file_system/file_system.go

@@ -27,6 +27,8 @@ func New() *FileSystem {
 }
 
 func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64, term uint64) (*Event, error) {
+	nodePath = path.Clean(path.Join("/", nodePath))
+
 	n, err := fs.InternalGet(nodePath, index, term)
 
 	if err != nil {
@@ -71,6 +73,11 @@ func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64,
 		e.Value = n.Value
 	}
 
+	if n.ExpireTime.Sub(Permanent) != 0 {
+		e.Expiration = &n.ExpireTime
+		e.TTL = int64(n.ExpireTime.Sub(time.Now())/time.Second) + 1
+	}
+
 	return e, nil
 }
 
@@ -78,7 +85,7 @@ func (fs *FileSystem) Get(nodePath string, recursive, sorted bool, index uint64,
 // If the node has already existed, create will fail.
 // If any node on the path is a file, create will fail.
 func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
-	nodePath = path.Clean("/" + nodePath)
+	nodePath = path.Clean(path.Join("/", nodePath))
 
 	// make sure we can create the node
 	_, err := fs.InternalGet(nodePath, index, term)
@@ -125,10 +132,10 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time
 	}
 
 	// Node with TTL
-	if expireTime != Permanent {
+	if expireTime.Sub(Permanent) != 0 {
 		n.Expire()
 		e.Expiration = &n.ExpireTime
-		e.TTL = int64(expireTime.Sub(time.Now()) / time.Second)
+		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
 	}
 
 	fs.WatcherHub.notify(e)
@@ -164,7 +171,7 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time
 	}
 
 	// update ttl
-	if !n.IsPermanent() && expireTime != Permanent {
+	if !n.IsPermanent() {
 		n.stopExpire <- true
 	}
 
@@ -172,7 +179,7 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time
 		n.ExpireTime = expireTime
 		n.Expire()
 		e.Expiration = &n.ExpireTime
-		e.TTL = int64(expireTime.Sub(time.Now()) / time.Second)
+		e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
 	}
 
 	fs.WatcherHub.notify(e)
@@ -205,7 +212,7 @@ func (fs *FileSystem) TestAndSet(nodePath string, prevValue string, prevIndex ui
 		return e, nil
 	}
 
-	cause := fmt.Sprintf("[%v/%v] [%v/%v]", prevValue, f.Value, prevIndex, f.ModifiedIndex)
+	cause := fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, f.Value, prevIndex, f.ModifiedIndex)
 	return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause)
 }
 
@@ -241,6 +248,16 @@ func (fs *FileSystem) Delete(nodePath string, recursive bool, index uint64, term
 	return e, nil
 }
 
+func (fs *FileSystem) Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error) {
+	fs.Index, fs.Term = index, term
+
+	if sinceIndex == 0 {
+		return fs.WatcherHub.watch(prefix, recursive, index+1)
+	}
+
+	return fs.WatcherHub.watch(prefix, recursive, sinceIndex)
+}
+
 // walk function walks all the nodePath and apply the walkFunc on each directory
 func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component string) (*Node, error)) (*Node, error) {
 	components := strings.Split(nodePath, "/")
@@ -265,7 +282,7 @@ func (fs *FileSystem) walk(nodePath string, walkFunc func(prev *Node, component
 
 // InternalGet function get the node of the given nodePath.
 func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (*Node, error) {
-	nodePath = path.Clean("/" + nodePath)
+	nodePath = path.Clean(path.Join("/", nodePath))
 
 	// update file system known index and term
 	fs.Index, fs.Term = index, term

+ 8 - 4
file_system/file_system_test.go

@@ -10,10 +10,7 @@ import (
 func TestCreateAndGet(t *testing.T) {
 	fs := New()
 
-	// this should create successfully
-	createAndGet(fs, "/foobar", t)
-	createAndGet(fs, "/foo/bar", t)
-	createAndGet(fs, "/foo/foo/bar", t)
+	fs.Create("/foobar", "bar", Permanent, 1, 1)
 
 	// already exist, create should fail
 	_, err := fs.Create("/foobar", "bar", Permanent, 1, 1)
@@ -22,6 +19,13 @@ func TestCreateAndGet(t *testing.T) {
 		t.Fatal("Create should fail")
 	}
 
+	fs.Delete("/foobar", true, 1, 1)
+
+	// this should create successfully
+	createAndGet(fs, "/foobar", t)
+	createAndGet(fs, "/foo/bar", t)
+	createAndGet(fs, "/foo/foo/bar", t)
+
 	// meet file, create should fail
 	_, err = fs.Create("/foo/bar/bar", "bar", Permanent, 2, 1)
 

+ 2 - 1
raft_handlers.go

@@ -2,8 +2,9 @@ package main
 
 import (
 	"encoding/json"
-	"github.com/coreos/go-raft"
 	"net/http"
+
+	"github.com/coreos/go-raft"
 )
 
 //-------------------------------------------------------------

+ 3 - 0
raft_server.go

@@ -277,4 +277,7 @@ func registerCommands() {
 	raft.RegisterCommand(&DeleteCommand{})
 	raft.RegisterCommand(&WatchCommand{})
 	raft.RegisterCommand(&TestAndSetCommand{})
+
+	raft.RegisterCommand(&CreateCommand{})
+	raft.RegisterCommand(&UpdateCommand{})
 }

+ 5 - 3
util.go

@@ -3,7 +3,6 @@ package main
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/coreos/etcd/web"
 	"io"
 	"log"
 	"net"
@@ -14,6 +13,9 @@ import (
 	"runtime/pprof"
 	"strconv"
 	"time"
+
+	"github.com/coreos/etcd/file_system"
+	"github.com/coreos/etcd/web"
 )
 
 //--------------------------------------
@@ -26,12 +28,12 @@ func durationToExpireTime(strDuration string) (time.Time, error) {
 		duration, err := strconv.Atoi(strDuration)
 
 		if err != nil {
-			return time.Unix(0, 0), err
+			return fileSystem.Permanent, err
 		}
 		return time.Now().Add(time.Second * (time.Duration)(duration)), nil
 
 	} else {
-		return time.Unix(0, 0), nil
+		return fileSystem.Permanent, nil
 	}
 }