txn_command.go 5.8 KB


  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "bufio"
  17. "fmt"
  18. "os"
  19. "regexp"
  20. "strconv"
  21. "strings"
  22. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
  23. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  24. "github.com/coreos/etcd/clientv3"
  25. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  26. )
  27. var (
  28. txnInteractive bool
  29. txnHex bool
  30. )
  31. // NewTxnCommand returns the cobra command for "txn".
  32. func NewTxnCommand() *cobra.Command {
  33. cmd := &cobra.Command{
  34. Use: "txn [options]",
  35. Short: "Txn processes all the requests in one transaction.",
  36. Run: txnCommandFunc,
  37. }
  38. cmd.Flags().BoolVarP(&txnInteractive, "interactive", "i", false, "input transaction in interactive mode")
  39. cmd.Flags().BoolVar(&txnHex, "hex", false, "print out key-values as hex encoded strings")
  40. return cmd
  41. }
  42. // txnCommandFunc executes the "txn" command.
  43. func txnCommandFunc(cmd *cobra.Command, args []string) {
  44. if len(args) != 0 {
  45. ExitWithError(ExitBadArgs, fmt.Errorf("txn command does not accept argument."))
  46. }
  47. if !txnInteractive {
  48. ExitWithError(ExitBadFeature, fmt.Errorf("txn command only supports interactive mode"))
  49. }
  50. reader := bufio.NewReader(os.Stdin)
  51. txn := clientv3.NewKV(mustClientFromCmd(cmd)).Txn(context.Background())
  52. fmt.Println("compares:")
  53. txn.If(readCompares(reader)...)
  54. fmt.Println("success requests (get, put, delete):")
  55. txn.Then(readOps(reader)...)
  56. fmt.Println("failure requests (get, put, delete):")
  57. txn.Else(readOps(reader)...)
  58. resp, err := txn.Commit()
  59. if err != nil {
  60. ExitWithError(ExitError, err)
  61. }
  62. printTxnResponse(*resp, txnHex)
  63. }
  64. func readCompares(r *bufio.Reader) (cmps []clientv3.Cmp) {
  65. for {
  66. line, err := r.ReadString('\n')
  67. if err != nil {
  68. ExitWithError(ExitInvalidInput, err)
  69. }
  70. if len(line) == 1 {
  71. break
  72. }
  73. // remove trialling \n
  74. line = line[:len(line)-1]
  75. cmp, err := parseCompare(line)
  76. if err != nil {
  77. ExitWithError(ExitInvalidInput, err)
  78. }
  79. cmps = append(cmps, *cmp)
  80. }
  81. return cmps
  82. }
  83. func readOps(r *bufio.Reader) (ops []clientv3.Op) {
  84. for {
  85. line, err := r.ReadString('\n')
  86. if err != nil {
  87. ExitWithError(ExitInvalidInput, err)
  88. }
  89. if len(line) == 1 {
  90. break
  91. }
  92. // remove trialling \n
  93. line = line[:len(line)-1]
  94. op, err := parseRequestUnion(line)
  95. if err != nil {
  96. ExitWithError(ExitInvalidInput, err)
  97. }
  98. ops = append(ops, *op)
  99. }
  100. return ops
  101. }
  102. func argify(s string) []string {
  103. r := regexp.MustCompile("'.+'|\".+\"|\\S+")
  104. return r.FindAllString(s, -1)
  105. }
  106. func parseRequestUnion(line string) (*clientv3.Op, error) {
  107. args := argify(line)
  108. if len(args) < 2 {
  109. return nil, fmt.Errorf("invalid txn compare request: %s", line)
  110. }
  111. opc := make(chan clientv3.Op, 1)
  112. put := NewPutCommand()
  113. put.Run = func(cmd *cobra.Command, args []string) {
  114. key, value, opts := getPutOp(cmd, args)
  115. opc <- clientv3.OpPut(key, value, opts...)
  116. }
  117. get := NewGetCommand()
  118. get.Run = func(cmd *cobra.Command, args []string) {
  119. key, opts := getGetOp(cmd, args)
  120. opc <- clientv3.OpGet(key, opts...)
  121. }
  122. del := NewDelCommand()
  123. del.Run = func(cmd *cobra.Command, args []string) {
  124. key, opts := getDelOp(cmd, args)
  125. opc <- clientv3.OpDelete(key, opts...)
  126. }
  127. cmds := &cobra.Command{SilenceErrors: true}
  128. cmds.AddCommand(put, get, del)
  129. cmds.SetArgs(args)
  130. if err := cmds.Execute(); err != nil {
  131. return nil, fmt.Errorf("invalid txn request: %s", line)
  132. }
  133. op := <-opc
  134. return &op, nil
  135. }
  136. func parseCompare(line string) (*clientv3.Cmp, error) {
  137. var (
  138. key string
  139. op string
  140. val string
  141. )
  142. lparenSplit := strings.SplitN(line, "(", 2)
  143. if len(lparenSplit) != 2 {
  144. return nil, fmt.Errorf("malformed comparison: %s", line)
  145. }
  146. target := lparenSplit[0]
  147. n, serr := fmt.Sscanf(lparenSplit[1], "%q) %s %q", &key, &op, &val)
  148. if n != 3 {
  149. return nil, fmt.Errorf("malformed comparison: %s; got %s(%q) %s %q", line, target, key, op, val)
  150. }
  151. if serr != nil {
  152. return nil, fmt.Errorf("malformed comparison: %s (%v)", line, serr)
  153. }
  154. var (
  155. v int64
  156. err error
  157. cmp clientv3.Cmp
  158. )
  159. switch target {
  160. case "ver", "version":
  161. if v, err = strconv.ParseInt(val, 10, 64); err == nil {
  162. cmp = clientv3.Compare(clientv3.Version(key), op, v)
  163. }
  164. case "c", "create":
  165. if v, err = strconv.ParseInt(val, 10, 64); err == nil {
  166. cmp = clientv3.Compare(clientv3.CreatedRevision(key), op, v)
  167. }
  168. case "m", "mod":
  169. if v, err = strconv.ParseInt(val, 10, 64); err == nil {
  170. cmp = clientv3.Compare(clientv3.ModifiedRevision(key), op, v)
  171. }
  172. case "val", "value":
  173. cmp = clientv3.Compare(clientv3.Value(key), op, val)
  174. default:
  175. return nil, fmt.Errorf("malformed comparison: %s (unknown target %s)", line, target)
  176. }
  177. if err != nil {
  178. return nil, fmt.Errorf("invalid txn compare request: %s", line)
  179. }
  180. return &cmp, nil
  181. }
  182. func printTxnResponse(resp clientv3.TxnResponse, isHex bool) {
  183. if resp.Succeeded {
  184. fmt.Println("SUCCESS")
  185. } else {
  186. fmt.Println("FAILURE")
  187. }
  188. for _, r := range resp.Responses {
  189. fmt.Println("")
  190. switch v := r.Response.(type) {
  191. case *pb.ResponseUnion_ResponseDeleteRange:
  192. printDeleteResponse((clientv3.DeleteResponse)(*v.ResponseDeleteRange))
  193. case *pb.ResponseUnion_ResponsePut:
  194. printPutResponse((clientv3.PutResponse)(*v.ResponsePut))
  195. case *pb.ResponseUnion_ResponseRange:
  196. printGetResponse(((clientv3.GetResponse)(*v.ResponseRange)), isHex)
  197. default:
  198. fmt.Printf("unexpected response %+v\n", r)
  199. }
  200. }
  201. }