Browse Source

Merge pull request #4571 from heyitsanthony/txn-interactive

etcdctlv3: improve txn interactive mode
Anthony Romano 10 years ago
parent
commit
d8b124cf3a

+ 69 - 3
etcdctlv3/README.md

@@ -15,7 +15,7 @@ PUT assigns the specified value with the specified key. If key already holds a v
 
 Simple reply
 
-- OK if PUT executed correctly. Exit code is zero. 
+- OK if PUT executed correctly. Exit code is zero.
 
 - Error string if PUT failed. Exit code is non-zero.
 
@@ -93,7 +93,7 @@ TODO: --prefix, --from
 
 Simple reply
 
-- The number of keys that were removed in decimal if DEL executed correctly. Exit code is zero. 
+- The number of keys that were removed in decimal if DEL executed correctly. Exit code is zero.
 
 - Error string if DEL failed. Exit code is non-zero.
 
@@ -109,6 +109,72 @@ OK
 ./etcdctl range foo
 ```
 
+### TXN [options]
+
+TXN applies multiple etcd requests as a single atomic transaction. A transaction consists of list of conditions, a list of requests to apply if all the conditions are true, and a list of requests to apply if any condition is false.
+
+#### Options
+
+- hex -- print out keys and values as hex encoded string
+
+- interactive -- input transaction with interactive mode
+
+#### Input Format
+Interactive mode:
+```ebnf
+<Txn> ::= <CMP>* "\n" <THEN> "\n" <ELSE> "\n"
+<CMP> ::= (<CMPCREATE>|<CMPMOD>|<CMPVAL>|<CMPVER>) "\n"
+<CMPOP> ::= "<" | "=" | ">"
+<CMPCREATE> := ("c"|"create")"("<KEY>")" <REVISION>
+<CMPMOD> ::= ("m"|"mod")"("<KEY>")" <CMPOP> <REVISION>
+<CMPVAL> ::= ("val"|"value")"("<KEY>")" <CMPOP> <VALUE>
+<CMPVER> ::= ("ver"|"version")"("<KEY>")" <CMPOP> <VERSION>
+<THEN> ::= <OP>*
+<ELSE> ::= <OP>*
+<OP> ::= ((see put, get, del etcdctl command syntax)) "\n"
+<KEY> ::= (%q formatted string)
+<VALUE> ::= (%q formatted string)
+<REVISION> ::= "\""[0-9]+"\""
+<VERSION> ::= "\""[0-9]+"\""
+```
+
+TODO: non-interactive mode
+
+#### Return value
+
+Simple reply
+
+- SUCCESS if etcd processed the transaction success list, FAILURE if etcd processed the transaction failure list.
+
+- Simple reply for each command executed request list, each separated by a blank line.
+
+- Additional error string if TXN failed. Exit code is non-zero.
+
+TODO: probably json and binary encoded proto
+
+#### Examples
+
+``` bash
+./etcdctl txn -i
+mod("key1") > "0"
+
+put key1 "overwrote-key1"
+
+put key1 "created-key1"
+put key2 "some extra key"
+
+FAILURE
+
+OK
+
+OK
+```
+
+#### Notes
+
+TODO: non-interactive mode
+
+
 ### WATCH [options] [key or prefix]
 
 Watch watches events stream on keys or prefixes. The watch command runs until it encounters an error or is terminated by the user.
@@ -117,7 +183,7 @@ Watch watches events stream on keys or prefixes. The watch command runs until it
 
 - hex -- print out key and value as hex encode string
 
-- i -- begins an interactive watch session
+- interactive -- begins an interactive watch session
 
 - prefix -- watch on a prefix if prefix is set.
 

+ 14 - 7
etcdctlv3/command/del_command.go

@@ -33,22 +33,29 @@ func NewDelCommand() *cobra.Command {
 
 // delCommandFunc executes the "del" command.
 func delCommandFunc(cmd *cobra.Command, args []string) {
+	key, opts := getDelOp(cmd, args)
+	c := mustClientFromCmd(cmd)
+	kvapi := clientv3.NewKV(c)
+	resp, err := kvapi.Delete(context.TODO(), key, opts...)
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	printDeleteResponse(*resp)
+}
+
+func getDelOp(cmd *cobra.Command, args []string) (string, []clientv3.OpOption) {
 	if len(args) == 0 || len(args) > 2 {
 		ExitWithError(ExitBadArgs, fmt.Errorf("del command needs one argument as key and an optional argument as range_end."))
 	}
-
 	opts := []clientv3.OpOption{}
 	key := args[0]
 	if len(args) > 1 {
 		opts = append(opts, clientv3.WithRange(args[1]))
 	}
+	return key, opts
+}
 
-	c := mustClientFromCmd(cmd)
-	kvapi := clientv3.NewKV(c)
-	_, err := kvapi.Delete(context.TODO(), key, opts...)
-	if err != nil {
-		ExitWithError(ExitError, err)
-	}
+func printDeleteResponse(resp clientv3.DeleteResponse) {
 	// TODO: add number of key removed into the response of delete.
 	// TODO: print out the number of removed keys.
 	fmt.Println(0)

+ 15 - 8
etcdctlv3/command/get_command.go

@@ -50,6 +50,17 @@ func NewGetCommand() *cobra.Command {
 
 // getCommandFunc executes the "get" command.
 func getCommandFunc(cmd *cobra.Command, args []string) {
+	key, opts := getGetOp(cmd, args)
+	c := mustClientFromCmd(cmd)
+	kvapi := clientv3.NewKV(c)
+	resp, err := kvapi.Get(context.TODO(), key, opts...)
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	printGetResponse(*resp, getHex)
+}
+
+func getGetOp(cmd *cobra.Command, args []string) (string, []clientv3.OpOption) {
 	if len(args) == 0 {
 		ExitWithError(ExitBadArgs, fmt.Errorf("range command needs arguments."))
 	}
@@ -94,15 +105,11 @@ func getCommandFunc(cmd *cobra.Command, args []string) {
 	}
 
 	opts = append(opts, clientv3.WithSort(sortByTarget, sortByOrder))
+	return key, opts
+}
 
-	c := mustClientFromCmd(cmd)
-	kvapi := clientv3.NewKV(c)
-	resp, err := kvapi.Get(context.TODO(), key, opts...)
-	if err != nil {
-		ExitWithError(ExitError, err)
-	}
-
+func printGetResponse(resp clientv3.GetResponse, isHex bool) {
 	for _, kv := range resp.Kvs {
-		printKV(getHex, kv)
+		printKV(isHex, kv)
 	}
 }

+ 20 - 5
etcdctlv3/command/put_command.go

@@ -56,6 +56,18 @@ will store the content of the file to <key>.
 
 // putCommandFunc executes the "put" command.
 func putCommandFunc(cmd *cobra.Command, args []string) {
+	key, value, opts := getPutOp(cmd, args)
+
+	c := mustClientFromCmd(cmd)
+	kvapi := clientv3.NewKV(c)
+	resp, err := kvapi.Put(context.TODO(), key, value, opts...)
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	printPutResponse(*resp)
+}
+
+func getPutOp(cmd *cobra.Command, args []string) (string, string, []clientv3.OpOption) {
 	if len(args) == 0 {
 		ExitWithError(ExitBadArgs, fmt.Errorf("put command needs 1 argument and input from stdin or 2 arguments."))
 	}
@@ -71,11 +83,14 @@ func putCommandFunc(cmd *cobra.Command, args []string) {
 		ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID (%v), expecting ID in Hex", err))
 	}
 
-	c := mustClientFromCmd(cmd)
-	kvapi := clientv3.NewKV(c)
-	_, err = kvapi.Put(context.TODO(), key, value, clientv3.WithLease(lease.LeaseID(id)))
-	if err != nil {
-		ExitWithError(ExitError, err)
+	opts := []clientv3.OpOption{}
+	if id != 0 {
+		opts = append(opts, clientv3.WithLease(lease.LeaseID(id)))
 	}
+
+	return key, value, opts
+}
+
+func printPutResponse(resp clientv3.PutResponse) {
 	fmt.Println("OK")
 }

+ 145 - 151
etcdctlv3/command/txn_command.go

@@ -18,21 +18,31 @@ import (
 	"bufio"
 	"fmt"
 	"os"
+	"regexp"
 	"strconv"
 	"strings"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/clientv3"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
+var (
+	txnInteractive bool
+	txnHex         bool
+)
+
 // NewTxnCommand returns the cobra command for "txn".
 func NewTxnCommand() *cobra.Command {
-	return &cobra.Command{
-		Use:   "txn",
+	cmd := &cobra.Command{
+		Use:   "txn [options]",
 		Short: "Txn processes all the requests in one transaction.",
 		Run:   txnCommandFunc,
 	}
+	cmd.Flags().BoolVarP(&txnInteractive, "interactive", "i", false, "input transaction in interactive mode")
+	cmd.Flags().BoolVar(&txnHex, "hex", false, "print out key-values as hex encoded strings")
+	return cmd
 }
 
 // txnCommandFunc executes the "txn" command.
@@ -41,198 +51,182 @@ func txnCommandFunc(cmd *cobra.Command, args []string) {
 		ExitWithError(ExitBadArgs, fmt.Errorf("txn command does not accept argument."))
 	}
 
+	if !txnInteractive {
+		ExitWithError(ExitBadFeature, fmt.Errorf("txn command only supports interactive mode"))
+	}
+
 	reader := bufio.NewReader(os.Stdin)
 
-	next := compareState
-	txn := &pb.TxnRequest{}
-	for next != nil {
-		next = next(txn, reader)
-	}
+	txn := clientv3.NewKV(mustClientFromCmd(cmd)).Txn(context.Background())
+	fmt.Println("compares:")
+	txn.If(readCompares(reader)...)
+	fmt.Println("success requests (get, put, delete):")
+	txn.Then(readOps(reader)...)
+	fmt.Println("failure requests (get, put, delete):")
+	txn.Else(readOps(reader)...)
 
-	resp, err := mustClientFromCmd(cmd).KV.Txn(context.Background(), txn)
+	resp, err := txn.Commit()
 	if err != nil {
 		ExitWithError(ExitError, err)
 	}
-	if resp.Succeeded {
-		fmt.Println("executed success request list")
-	} else {
-		fmt.Println("executed failure request list")
-	}
-}
 
-type stateFunc func(txn *pb.TxnRequest, r *bufio.Reader) stateFunc
+	printTxnResponse(*resp, txnHex)
+}
 
-func compareState(txn *pb.TxnRequest, r *bufio.Reader) stateFunc {
-	fmt.Println("entry comparison[key target expected_result compare_value] (end with empty line):")
+func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
+	for {
+		line, err := r.ReadString('\n')
+		if err != nil {
+			ExitWithError(ExitInvalidInput, err)
+		}
+		if len(line) == 1 {
+			break
+		}
 
-	line, err := r.ReadString('\n')
-	if err != nil {
-		ExitWithError(ExitInvalidInput, err)
+		// remove trialling \n
+		line = line[:len(line)-1]
+		cmp, err := parseCompare(line)
+		if err != nil {
+			ExitWithError(ExitInvalidInput, err)
+		}
+		cmps = append(cmps, *cmp)
 	}
 
-	if len(line) == 1 {
-		return successState
-	}
+	return cmps
+}
 
-	// remove trialling \n
-	line = line[:len(line)-1]
-	c, err := parseCompare(line)
-	if err != nil {
-		ExitWithError(ExitInvalidInput, err)
-	}
+func readOps(r *bufio.Reader) (ops []clientv3.Op) {
+	for {
+		line, err := r.ReadString('\n')
+		if err != nil {
+			ExitWithError(ExitInvalidInput, err)
+		}
+		if len(line) == 1 {
+			break
+		}
 
-	txn.Compare = append(txn.Compare, c)
+		// remove trialling \n
+		line = line[:len(line)-1]
+		op, err := parseRequestUnion(line)
+		if err != nil {
+			ExitWithError(ExitInvalidInput, err)
+		}
+		ops = append(ops, *op)
+	}
 
-	return compareState
+	return ops
 }
 
-func successState(txn *pb.TxnRequest, r *bufio.Reader) stateFunc {
-	fmt.Println("entry success request[method key value(end_range)] (end with empty line):")
+func argify(s string) []string {
+	r := regexp.MustCompile("'.+'|\".+\"|\\S+")
+	return r.FindAllString(s, -1)
+}
 
-	line, err := r.ReadString('\n')
-	if err != nil {
-		ExitWithError(ExitInvalidInput, err)
+func parseRequestUnion(line string) (*clientv3.Op, error) {
+	args := argify(line)
+	if len(args) < 2 {
+		return nil, fmt.Errorf("invalid txn compare request: %s", line)
 	}
 
-	if len(line) == 1 {
-		return failureState
-	}
+	opc := make(chan clientv3.Op, 1)
 
-	// remove trialling \n
-	line = line[:len(line)-1]
-	ru, err := parseRequestUnion(line)
-	if err != nil {
-		ExitWithError(ExitInvalidInput, err)
+	put := NewPutCommand()
+	put.Run = func(cmd *cobra.Command, args []string) {
+		key, value, opts := getPutOp(cmd, args)
+		opc <- clientv3.OpPut(key, value, opts...)
 	}
-
-	txn.Success = append(txn.Success, ru)
-
-	return successState
-}
-
-func failureState(txn *pb.TxnRequest, r *bufio.Reader) stateFunc {
-	fmt.Println("entry failure request[method key value(end_range)] (end with empty line):")
-
-	line, err := r.ReadString('\n')
-	if err != nil {
-		ExitWithError(ExitInvalidInput, err)
+	get := NewGetCommand()
+	get.Run = func(cmd *cobra.Command, args []string) {
+		key, opts := getGetOp(cmd, args)
+		opc <- clientv3.OpGet(key, opts...)
 	}
-
-	if len(line) == 1 {
-		return nil
+	del := NewDelCommand()
+	del.Run = func(cmd *cobra.Command, args []string) {
+		key, opts := getDelOp(cmd, args)
+		opc <- clientv3.OpDelete(key, opts...)
 	}
+	cmds := &cobra.Command{SilenceErrors: true}
+	cmds.AddCommand(put, get, del)
 
-	// remove trialling \n
-	line = line[:len(line)-1]
-	ru, err := parseRequestUnion(line)
-	if err != nil {
-		ExitWithError(ExitInvalidInput, err)
+	cmds.SetArgs(args)
+	if err := cmds.Execute(); err != nil {
+		return nil, fmt.Errorf("invalid txn request: %s", line)
 	}
 
-	txn.Failure = append(txn.Failure, ru)
-
-	return failureState
+	op := <-opc
+	return &op, nil
 }
 
-func parseRequestUnion(line string) (*pb.RequestUnion, error) {
-	parts := strings.Split(line, " ")
-	if len(parts) < 2 {
-		return nil, fmt.Errorf("invalid txn compare request: %s", line)
-	}
+func parseCompare(line string) (*clientv3.Cmp, error) {
+	var (
+		key string
+		op  string
+		val string
+	)
 
-	ru := &pb.RequestUnion{}
-	key := []byte(parts[1])
-	switch parts[0] {
-	case "r", "range":
-		if len(parts) == 3 {
-			ru.Request = &pb.RequestUnion_RequestRange{
-				RequestRange: &pb.RangeRequest{
-					Key:      key,
-					RangeEnd: []byte(parts[2]),
-				}}
-		} else {
-			ru.Request = &pb.RequestUnion_RequestRange{
-				RequestRange: &pb.RangeRequest{
-					Key: key,
-				}}
-		}
-	case "p", "put":
-		ru.Request = &pb.RequestUnion_RequestPut{
-			RequestPut: &pb.PutRequest{
-				Key:   key,
-				Value: []byte(parts[2]),
-			}}
-	case "d", "deleteRange":
-		if len(parts) == 3 {
-			ru.Request = &pb.RequestUnion_RequestDeleteRange{
-				RequestDeleteRange: &pb.DeleteRangeRequest{
-					Key:      key,
-					RangeEnd: []byte(parts[2]),
-				}}
-		} else {
-			ru.Request = &pb.RequestUnion_RequestDeleteRange{
-				RequestDeleteRange: &pb.DeleteRangeRequest{
-					Key: key,
-				}}
-		}
-	default:
-		return nil, fmt.Errorf("invalid txn request: %s", line)
+	lparenSplit := strings.SplitN(line, "(", 2)
+	if len(lparenSplit) != 2 {
+		return nil, fmt.Errorf("malformed comparison: %s", line)
 	}
-	return ru, nil
-}
 
-func parseCompare(line string) (*pb.Compare, error) {
-	parts := strings.Split(line, " ")
-	if len(parts) != 4 {
-		return nil, fmt.Errorf("invalid txn compare request: %s", line)
+	target := lparenSplit[0]
+	n, serr := fmt.Sscanf(lparenSplit[1], "%q) %s %q", &key, &op, &val)
+	if n != 3 {
+		return nil, fmt.Errorf("malformed comparison: %s; got %s(%q) %s %q", line, target, key, op, val)
+	}
+	if serr != nil {
+		return nil, fmt.Errorf("malformed comparison: %s (%v)", line, serr)
 	}
 
-	var err error
-	c := &pb.Compare{}
-	c.Key = []byte(parts[0])
-	switch parts[1] {
+	var (
+		v   int64
+		err error
+		cmp clientv3.Cmp
+	)
+	switch target {
 	case "ver", "version":
-		tv, _ := c.TargetUnion.(*pb.Compare_Version)
-		if tv != nil {
-			tv.Version, err = strconv.ParseInt(parts[3], 10, 64)
-			if err != nil {
-				return nil, fmt.Errorf("invalid txn compare request: %s", line)
-			}
+		if v, err = strconv.ParseInt(val, 10, 64); err == nil {
+			cmp = clientv3.Compare(clientv3.Version(key), op, v)
 		}
 	case "c", "create":
-		tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision)
-		if tv != nil {
-			tv.CreateRevision, err = strconv.ParseInt(parts[3], 10, 64)
-			if err != nil {
-				return nil, fmt.Errorf("invalid txn compare request: %s", line)
-			}
+		if v, err = strconv.ParseInt(val, 10, 64); err == nil {
+			cmp = clientv3.Compare(clientv3.CreatedRevision(key), op, v)
 		}
 	case "m", "mod":
-		tv, _ := c.TargetUnion.(*pb.Compare_ModRevision)
-		if tv != nil {
-			tv.ModRevision, err = strconv.ParseInt(parts[3], 10, 64)
-			if err != nil {
-				return nil, fmt.Errorf("invalid txn compare request: %s", line)
-			}
+		if v, err = strconv.ParseInt(val, 10, 64); err == nil {
+			cmp = clientv3.Compare(clientv3.ModifiedRevision(key), op, v)
 		}
 	case "val", "value":
-		tv, _ := c.TargetUnion.(*pb.Compare_Value)
-		if tv != nil {
-			tv.Value = []byte(parts[3])
-		}
+		cmp = clientv3.Compare(clientv3.Value(key), op, val)
 	default:
-		return nil, fmt.Errorf("invalid txn compare request: %s", line)
+		return nil, fmt.Errorf("malformed comparison: %s (unknown target %s)", line, target)
 	}
 
-	switch parts[2] {
-	case "g", "greater":
-		c.Result = pb.Compare_GREATER
-	case "e", "equal":
-		c.Result = pb.Compare_EQUAL
-	case "l", "less":
-		c.Result = pb.Compare_LESS
-	default:
+	if err != nil {
 		return nil, fmt.Errorf("invalid txn compare request: %s", line)
 	}
-	return c, nil
+
+	return &cmp, nil
+}
+
+func printTxnResponse(resp clientv3.TxnResponse, isHex bool) {
+	if resp.Succeeded {
+		fmt.Println("SUCCESS")
+	} else {
+		fmt.Println("FAILURE")
+	}
+
+	for _, r := range resp.Responses {
+		fmt.Println("")
+		switch v := r.Response.(type) {
+		case *pb.ResponseUnion_ResponseDeleteRange:
+			printDeleteResponse((clientv3.DeleteResponse)(*v.ResponseDeleteRange))
+		case *pb.ResponseUnion_ResponsePut:
+			printPutResponse((clientv3.PutResponse)(*v.ResponsePut))
+		case *pb.ResponseUnion_ResponseRange:
+			printGetResponse(((clientv3.GetResponse)(*v.ResponseRange)), isHex)
+		default:
+			fmt.Printf("unexpected response %+v\n", r)
+		}
+	}
 }

+ 1 - 1
etcdctlv3/command/watch_command.go

@@ -44,7 +44,7 @@ func NewWatchCommand() *cobra.Command {
 	}
 
 	cmd.Flags().BoolVar(&watchHex, "hex", false, "print out key and value as hex encode string for text format")
-	cmd.Flags().BoolVar(&watchInteractive, "i", false, "interactive mode")
+	cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "interactive mode")
 	cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "watch on a prefix if prefix is set")
 	cmd.Flags().Int64Var(&watchRev, "rev", 0, "revision to start watching")