watch_command.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "bufio"
  17. "context"
  18. "errors"
  19. "fmt"
  20. "os"
  21. "os/exec"
  22. "strings"
  23. "github.com/coreos/etcd/clientv3"
  24. "github.com/spf13/cobra"
  25. )
  26. var (
  27. errBadArgsNum = errors.New("bad number of arguments")
  28. errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)")
  29. errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls")
  30. )
  31. var (
  32. watchRev int64
  33. watchPrefix bool
  34. watchInteractive bool
  35. watchPrevKey bool
  36. )
  37. // NewWatchCommand returns the cobra command for "watch".
  38. func NewWatchCommand() *cobra.Command {
  39. cmd := &cobra.Command{
  40. Use: "watch [options] [key or prefix] [range_end] [--] [exec-command arg1 arg2 ...]",
  41. Short: "Watches events stream on keys or prefixes",
  42. Run: watchCommandFunc,
  43. }
  44. cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
  45. cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
  46. cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
  47. cmd.Flags().BoolVar(&watchPrevKey, "prev-kv", false, "get the previous key-value pair before the event happens")
  48. return cmd
  49. }
  50. // watchCommandFunc executes the "watch" command.
  51. func watchCommandFunc(cmd *cobra.Command, args []string) {
  52. if watchInteractive {
  53. watchInteractiveFunc(cmd, os.Args)
  54. return
  55. }
  56. watchArgs, execArgs, err := parseWatchArgs(os.Args, args, false)
  57. if err != nil {
  58. ExitWithError(ExitBadArgs, err)
  59. }
  60. c := mustClientFromCmd(cmd)
  61. wc, err := getWatchChan(c, watchArgs)
  62. if err != nil {
  63. ExitWithError(ExitBadArgs, err)
  64. }
  65. printWatchCh(c, wc, execArgs)
  66. if err = c.Close(); err != nil {
  67. ExitWithError(ExitBadConnection, err)
  68. }
  69. ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
  70. }
  71. func watchInteractiveFunc(cmd *cobra.Command, osArgs []string) {
  72. c := mustClientFromCmd(cmd)
  73. reader := bufio.NewReader(os.Stdin)
  74. for {
  75. l, err := reader.ReadString('\n')
  76. if err != nil {
  77. ExitWithError(ExitInvalidInput, fmt.Errorf("Error reading watch request line: %v", err))
  78. }
  79. l = strings.TrimSuffix(l, "\n")
  80. args := argify(l)
  81. if len(args) < 2 {
  82. fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
  83. continue
  84. }
  85. if args[0] != "watch" {
  86. fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l)
  87. continue
  88. }
  89. watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, true)
  90. if perr != nil {
  91. ExitWithError(ExitBadArgs, perr)
  92. }
  93. ch, err := getWatchChan(c, watchArgs)
  94. if err != nil {
  95. fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
  96. continue
  97. }
  98. go printWatchCh(c, ch, execArgs)
  99. }
  100. }
  101. func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
  102. if len(args) < 1 {
  103. return nil, errBadArgsNum
  104. }
  105. key := args[0]
  106. opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
  107. if len(args) == 2 {
  108. if watchPrefix {
  109. return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
  110. }
  111. opts = append(opts, clientv3.WithRange(args[1]))
  112. }
  113. if watchPrefix {
  114. opts = append(opts, clientv3.WithPrefix())
  115. }
  116. if watchPrevKey {
  117. opts = append(opts, clientv3.WithPrevKV())
  118. }
  119. return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
  120. }
  121. func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string) {
  122. for resp := range ch {
  123. if resp.Canceled {
  124. fmt.Fprintf(os.Stderr, "watch was canceled (%v)\n", resp.Err())
  125. }
  126. display.Watch(resp)
  127. if len(execArgs) > 0 {
  128. cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
  129. cmd.Env = os.Environ()
  130. cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
  131. if err := cmd.Run(); err != nil {
  132. fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
  133. }
  134. }
  135. }
  136. }
  137. // "commandArgs" is the command arguments after "spf13/cobra" parses
  138. // all "watch" command flags, strips out special characters (e.g. "--").
  139. // "orArgs" is the raw arguments passed to "watch" command
  140. // (e.g. ./bin/etcdctl watch foo --rev 1 bar).
  141. // "--" characters are invalid arguments for "spf13/cobra" library,
  142. // so no need to handle such cases.
  143. func parseWatchArgs(osArgs, commandArgs []string, interactive bool) (watchArgs []string, execArgs []string, err error) {
  144. watchArgs = commandArgs
  145. // remove preceding commands (e.g. "watch foo bar" in interactive mode)
  146. idx := 0
  147. for idx = range watchArgs {
  148. if watchArgs[idx] == "watch" {
  149. break
  150. }
  151. }
  152. if idx < len(watchArgs)-1 {
  153. watchArgs = watchArgs[idx+1:]
  154. } else if interactive { // "watch" not found
  155. return nil, nil, errBadArgsInteractiveWatch
  156. }
  157. if len(watchArgs) < 1 {
  158. return nil, nil, errBadArgsNum
  159. }
  160. // remove preceding commands (e.g. ./bin/etcdctl watch)
  161. for idx = range osArgs {
  162. if osArgs[idx] == "watch" {
  163. break
  164. }
  165. }
  166. if idx < len(osArgs)-1 {
  167. osArgs = osArgs[idx+1:]
  168. } else {
  169. return nil, nil, errBadArgsNum
  170. }
  171. argsWithSep := osArgs
  172. if interactive { // interactive mode pass "--" to the command args
  173. argsWithSep = watchArgs
  174. }
  175. foundSep := false
  176. for idx = range argsWithSep {
  177. if argsWithSep[idx] == "--" && idx > 0 {
  178. foundSep = true
  179. break
  180. }
  181. }
  182. if interactive {
  183. flagset := NewWatchCommand().Flags()
  184. if err := flagset.Parse(argsWithSep); err != nil {
  185. return nil, nil, err
  186. }
  187. watchArgs = flagset.Args()
  188. }
  189. if !foundSep {
  190. return watchArgs, nil, nil
  191. }
  192. if idx == len(argsWithSep)-1 {
  193. // "watch foo bar --" should error
  194. return nil, nil, errBadArgsNumSeparator
  195. }
  196. execArgs = argsWithSep[idx+1:]
  197. // "watch foo bar --rev 1 -- echo hello" or "watch foo --rev 1 bar -- echo hello",
  198. // then "watchArgs" is "foo bar echo hello"
  199. // so need ignore args after "argsWithSep[idx]", which is "--"
  200. endIdx := 0
  201. for endIdx = len(watchArgs) - 1; endIdx >= 0; endIdx-- {
  202. if watchArgs[endIdx] == argsWithSep[idx+1] {
  203. break
  204. }
  205. }
  206. watchArgs = watchArgs[:endIdx]
  207. return watchArgs, execArgs, nil
  208. }