watch_command.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "bufio"
  17. "context"
  18. "errors"
  19. "fmt"
  20. "os"
  21. "os/exec"
  22. "strings"
  23. "github.com/coreos/etcd/clientv3"
  24. "github.com/spf13/cobra"
  25. )
  26. var (
  27. errBadArgsNum = errors.New("bad number of arguments")
  28. errBadArgsNumConflictEnv = errors.New("bad number of arguments (found conflicting environment key)")
  29. errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)")
  30. errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls")
  31. )
  32. var (
  33. watchRev int64
  34. watchPrefix bool
  35. watchInteractive bool
  36. watchPrevKey bool
  37. )
  38. // NewWatchCommand returns the cobra command for "watch".
  39. func NewWatchCommand() *cobra.Command {
  40. cmd := &cobra.Command{
  41. Use: "watch [options] [key or prefix] [range_end] [--] [exec-command arg1 arg2 ...]",
  42. Short: "Watches events stream on keys or prefixes",
  43. Run: watchCommandFunc,
  44. }
  45. cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
  46. cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
  47. cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
  48. cmd.Flags().BoolVar(&watchPrevKey, "prev-kv", false, "get the previous key-value pair before the event happens")
  49. return cmd
  50. }
  51. // watchCommandFunc executes the "watch" command.
  52. func watchCommandFunc(cmd *cobra.Command, args []string) {
  53. envKey, envRange := os.Getenv("ETCDCTL_WATCH_KEY"), os.Getenv("ETCDCTL_WATCH_RANGE_END")
  54. if envKey == "" && envRange != "" {
  55. ExitWithError(ExitBadArgs, fmt.Errorf("ETCDCTL_WATCH_KEY is empty but got ETCDCTL_WATCH_RANGE_END=%q", envRange))
  56. }
  57. if watchInteractive {
  58. watchInteractiveFunc(cmd, os.Args, envKey, envRange)
  59. return
  60. }
  61. watchArgs, execArgs, err := parseWatchArgs(os.Args, args, envKey, envRange, false)
  62. if err != nil {
  63. ExitWithError(ExitBadArgs, err)
  64. }
  65. c := mustClientFromCmd(cmd)
  66. wc, err := getWatchChan(c, watchArgs)
  67. if err != nil {
  68. ExitWithError(ExitBadArgs, err)
  69. }
  70. printWatchCh(c, wc, execArgs)
  71. if err = c.Close(); err != nil {
  72. ExitWithError(ExitBadConnection, err)
  73. }
  74. ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
  75. }
  76. func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange string) {
  77. c := mustClientFromCmd(cmd)
  78. reader := bufio.NewReader(os.Stdin)
  79. for {
  80. l, err := reader.ReadString('\n')
  81. if err != nil {
  82. ExitWithError(ExitInvalidInput, fmt.Errorf("Error reading watch request line: %v", err))
  83. }
  84. l = strings.TrimSuffix(l, "\n")
  85. args := argify(l)
  86. if len(args) < 2 && envKey == "" {
  87. fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
  88. continue
  89. }
  90. if args[0] != "watch" {
  91. fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l)
  92. continue
  93. }
  94. watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
  95. if perr != nil {
  96. ExitWithError(ExitBadArgs, perr)
  97. }
  98. ch, err := getWatchChan(c, watchArgs)
  99. if err != nil {
  100. fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
  101. continue
  102. }
  103. go printWatchCh(c, ch, execArgs)
  104. }
  105. }
  106. func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
  107. if len(args) < 1 {
  108. return nil, errBadArgsNum
  109. }
  110. key := args[0]
  111. opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
  112. if len(args) == 2 {
  113. if watchPrefix {
  114. return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
  115. }
  116. opts = append(opts, clientv3.WithRange(args[1]))
  117. }
  118. if watchPrefix {
  119. opts = append(opts, clientv3.WithPrefix())
  120. }
  121. if watchPrevKey {
  122. opts = append(opts, clientv3.WithPrevKV())
  123. }
  124. return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
  125. }
  126. func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string) {
  127. for resp := range ch {
  128. if resp.Canceled {
  129. fmt.Fprintf(os.Stderr, "watch was canceled (%v)\n", resp.Err())
  130. }
  131. display.Watch(resp)
  132. if len(execArgs) > 0 {
  133. for _, ev := range resp.Events {
  134. cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
  135. cmd.Env = os.Environ()
  136. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_REVISION=%d", resp.Header.Revision))
  137. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_EVENT_TYPE=%q", ev.Type))
  138. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_KEY=%q", ev.Kv.Key))
  139. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_VALUE=%q", ev.Kv.Value))
  140. cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
  141. if err := cmd.Run(); err != nil {
  142. fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
  143. os.Exit(1)
  144. }
  145. }
  146. }
  147. }
  148. }
  149. // "commandArgs" is the command arguments after "spf13/cobra" parses
  150. // all "watch" command flags, strips out special characters (e.g. "--").
  151. // "orArgs" is the raw arguments passed to "watch" command
  152. // (e.g. ./bin/etcdctl watch foo --rev 1 bar).
  153. // "--" characters are invalid arguments for "spf13/cobra" library,
  154. // so no need to handle such cases.
  155. func parseWatchArgs(osArgs, commandArgs []string, envKey, envRange string, interactive bool) (watchArgs []string, execArgs []string, err error) {
  156. watchArgs = commandArgs
  157. // remove preceding commands (e.g. "watch foo bar" in interactive mode)
  158. idx := 0
  159. for idx = range watchArgs {
  160. if watchArgs[idx] == "watch" {
  161. break
  162. }
  163. }
  164. if idx < len(watchArgs)-1 || envKey != "" {
  165. if idx < len(watchArgs)-1 {
  166. watchArgs = watchArgs[idx+1:]
  167. }
  168. execIdx, execExist := 0, false
  169. for execIdx = range osArgs {
  170. v := osArgs[execIdx]
  171. if v == "--" && execIdx != len(osArgs)-1 {
  172. execExist = true
  173. break
  174. }
  175. }
  176. if idx == len(watchArgs)-1 && envKey != "" {
  177. if len(watchArgs) > 0 && !interactive {
  178. // "watch --rev 1 -- echo Hello World" has no conflict
  179. if !execExist {
  180. // "watch foo" with ETCDCTL_WATCH_KEY=foo
  181. // (watchArgs==["foo"])
  182. return nil, nil, errBadArgsNumConflictEnv
  183. }
  184. }
  185. // otherwise, watch with no argument and environment key is set
  186. // if interactive, first "watch" command string should be removed
  187. if interactive {
  188. watchArgs = []string{}
  189. }
  190. }
  191. // "watch foo -- echo hello" with ETCDCTL_WATCH_KEY=foo
  192. // (watchArgs==["foo","echo","hello"])
  193. if envKey != "" && execExist {
  194. widx, oidx := 0, len(osArgs)-1
  195. for widx = len(watchArgs) - 1; widx >= 0; widx-- {
  196. if watchArgs[widx] == osArgs[oidx] {
  197. oidx--
  198. continue
  199. }
  200. if oidx == execIdx { // watchArgs has extra
  201. return nil, nil, errBadArgsNumConflictEnv
  202. }
  203. }
  204. }
  205. } else if interactive { // "watch" not found
  206. return nil, nil, errBadArgsInteractiveWatch
  207. }
  208. if len(watchArgs) < 1 && envKey == "" {
  209. return nil, nil, errBadArgsNum
  210. }
  211. // remove preceding commands (e.g. ./bin/etcdctl watch)
  212. for idx = range osArgs {
  213. if osArgs[idx] == "watch" {
  214. break
  215. }
  216. }
  217. if idx < len(osArgs)-1 {
  218. osArgs = osArgs[idx+1:]
  219. } else if envKey == "" {
  220. return nil, nil, errBadArgsNum
  221. }
  222. argsWithSep := osArgs
  223. if interactive { // interactive mode pass "--" to the command args
  224. argsWithSep = watchArgs
  225. }
  226. foundSep := false
  227. for idx = range argsWithSep {
  228. if argsWithSep[idx] == "--" {
  229. foundSep = true
  230. break
  231. }
  232. }
  233. if interactive {
  234. flagset := NewWatchCommand().Flags()
  235. if err := flagset.Parse(argsWithSep); err != nil {
  236. return nil, nil, err
  237. }
  238. watchArgs = flagset.Args()
  239. }
  240. // "watch -- echo hello" with ETCDCTL_WATCH_KEY=foo
  241. // should be translated to "watch foo -- echo hello"
  242. // (watchArgs=["echo","hello"] should be ["foo","echo","hello"])
  243. if envKey != "" {
  244. tmp := []string{envKey}
  245. if envRange != "" {
  246. tmp = append(tmp, envRange)
  247. }
  248. watchArgs = append(tmp, watchArgs...)
  249. }
  250. if !foundSep {
  251. return watchArgs, nil, nil
  252. }
  253. if idx == len(argsWithSep)-1 {
  254. // "watch foo bar --" should error
  255. return nil, nil, errBadArgsNumSeparator
  256. }
  257. execArgs = argsWithSep[idx+1:]
  258. // "watch foo bar --rev 1 -- echo hello" or "watch foo --rev 1 bar -- echo hello",
  259. // then "watchArgs" is "foo bar echo hello"
  260. // so need ignore args after "argsWithSep[idx]", which is "--"
  261. endIdx := 0
  262. for endIdx = len(watchArgs) - 1; endIdx >= 0; endIdx-- {
  263. if watchArgs[endIdx] == argsWithSep[idx+1] {
  264. break
  265. }
  266. }
  267. watchArgs = watchArgs[:endIdx]
  268. return watchArgs, execArgs, nil
  269. }