watch_command.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "bufio"
  17. "context"
  18. "errors"
  19. "fmt"
  20. "os"
  21. "os/exec"
  22. "strings"
  23. "github.com/coreos/etcd/clientv3"
  24. "github.com/spf13/cobra"
  25. )
  26. var (
  27. errBadArgsNum = errors.New("bad number of arguments")
  28. errBadArgsNumConflictEnv = errors.New("bad number of arguments (found conflicting environment key)")
  29. errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)")
  30. errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls")
  31. )
  32. var (
  33. watchRev int64
  34. watchPrefix bool
  35. watchInteractive bool
  36. watchPrevKey bool
  37. )
  38. // NewWatchCommand returns the cobra command for "watch".
  39. func NewWatchCommand() *cobra.Command {
  40. cmd := &cobra.Command{
  41. Use: "watch [options] [key or prefix] [range_end] [--] [exec-command arg1 arg2 ...]",
  42. Short: "Watches events stream on keys or prefixes",
  43. Run: watchCommandFunc,
  44. }
  45. cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
  46. cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
  47. cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
  48. cmd.Flags().BoolVar(&watchPrevKey, "prev-kv", false, "get the previous key-value pair before the event happens")
  49. return cmd
  50. }
  51. // watchCommandFunc executes the "watch" command.
  52. func watchCommandFunc(cmd *cobra.Command, args []string) {
  53. envKey, envRange := os.Getenv("ETCDCTL_WATCH_KEY"), os.Getenv("ETCDCTL_WATCH_RANGE_END")
  54. if envKey == "" && envRange != "" {
  55. ExitWithError(ExitBadArgs, fmt.Errorf("ETCDCTL_WATCH_KEY is empty but got ETCDCTL_WATCH_RANGE_END=%q", envRange))
  56. }
  57. if watchInteractive {
  58. watchInteractiveFunc(cmd, os.Args, envKey, envRange)
  59. return
  60. }
  61. watchArgs, execArgs, err := parseWatchArgs(os.Args, args, envKey, envRange, false)
  62. if err != nil {
  63. ExitWithError(ExitBadArgs, err)
  64. }
  65. c := mustClientFromCmd(cmd)
  66. wc, err := getWatchChan(c, watchArgs)
  67. if err != nil {
  68. ExitWithError(ExitBadArgs, err)
  69. }
  70. printWatchCh(c, wc, execArgs)
  71. if err = c.Close(); err != nil {
  72. ExitWithError(ExitBadConnection, err)
  73. }
  74. ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
  75. }
  76. func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange string) {
  77. c := mustClientFromCmd(cmd)
  78. reader := bufio.NewReader(os.Stdin)
  79. for {
  80. l, err := reader.ReadString('\n')
  81. if err != nil {
  82. ExitWithError(ExitInvalidInput, fmt.Errorf("Error reading watch request line: %v", err))
  83. }
  84. l = strings.TrimSuffix(l, "\n")
  85. args := argify(l)
  86. if len(args) < 2 && envKey == "" {
  87. fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
  88. continue
  89. }
  90. if args[0] != "watch" {
  91. fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l)
  92. continue
  93. }
  94. watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
  95. if perr != nil {
  96. ExitWithError(ExitBadArgs, perr)
  97. }
  98. ch, err := getWatchChan(c, watchArgs)
  99. if err != nil {
  100. fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
  101. continue
  102. }
  103. go printWatchCh(c, ch, execArgs)
  104. }
  105. }
  106. func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
  107. if len(args) < 1 {
  108. return nil, errBadArgsNum
  109. }
  110. key := args[0]
  111. opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
  112. if len(args) == 2 {
  113. if watchPrefix {
  114. return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
  115. }
  116. opts = append(opts, clientv3.WithRange(args[1]))
  117. }
  118. if watchPrefix {
  119. opts = append(opts, clientv3.WithPrefix())
  120. }
  121. if watchPrevKey {
  122. opts = append(opts, clientv3.WithPrevKV())
  123. }
  124. return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
  125. }
  126. func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string) {
  127. for resp := range ch {
  128. if resp.Canceled {
  129. fmt.Fprintf(os.Stderr, "watch was canceled (%v)\n", resp.Err())
  130. }
  131. display.Watch(resp)
  132. if len(execArgs) > 0 {
  133. for _, ev := range resp.Events {
  134. cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
  135. cmd.Env = os.Environ()
  136. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_REVISION=%d", resp.Header.Revision))
  137. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_EVENT_TYPE=%q", ev.Type))
  138. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_KEY=%q", ev.Kv.Key))
  139. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_VALUE=%q", ev.Kv.Value))
  140. cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
  141. if err := cmd.Run(); err != nil {
  142. fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
  143. os.Exit(1)
  144. }
  145. }
  146. }
  147. }
  148. }
  149. // "commandArgs" is the command arguments after "spf13/cobra" parses
  150. // all "watch" command flags, strips out special characters (e.g. "--").
  151. // "orArgs" is the raw arguments passed to "watch" command
  152. // (e.g. ./bin/etcdctl watch foo --rev 1 bar).
  153. // "--" characters are invalid arguments for "spf13/cobra" library,
  154. // so no need to handle such cases.
  155. func parseWatchArgs(osArgs, commandArgs []string, envKey, envRange string, interactive bool) (watchArgs []string, execArgs []string, err error) {
  156. rawArgs := make([]string, len(osArgs))
  157. copy(rawArgs, osArgs)
  158. watchArgs = make([]string, len(commandArgs))
  159. copy(watchArgs, commandArgs)
  160. // remove preceding commands (e.g. ./bin/etcdctl watch)
  161. // handle "./bin/etcdctl watch foo -- echo watch event"
  162. for idx := range rawArgs {
  163. if rawArgs[idx] == "watch" {
  164. rawArgs = rawArgs[idx+1:]
  165. break
  166. }
  167. }
  168. // remove preceding commands (e.g. "watch foo bar" in interactive mode)
  169. // handle "./bin/etcdctl watch foo -- echo watch event"
  170. if interactive {
  171. if watchArgs[0] != "watch" {
  172. // "watch" not found
  173. watchPrefix, watchRev, watchPrevKey = false, 0, false
  174. return nil, nil, errBadArgsInteractiveWatch
  175. }
  176. watchArgs = watchArgs[1:]
  177. }
  178. execIdx, execExist := 0, false
  179. if !interactive {
  180. for execIdx = range rawArgs {
  181. if rawArgs[execIdx] == "--" {
  182. execExist = true
  183. break
  184. }
  185. }
  186. if execExist && execIdx == len(rawArgs)-1 {
  187. // "watch foo bar --" should error
  188. return nil, nil, errBadArgsNumSeparator
  189. }
  190. // "watch" with no argument should error
  191. if !execExist && len(rawArgs) < 1 && envKey == "" {
  192. return nil, nil, errBadArgsNum
  193. }
  194. if execExist && envKey != "" {
  195. // "ETCDCTL_WATCH_KEY=foo watch foo -- echo 1" should error
  196. // (watchArgs==["foo","echo","1"])
  197. widx, ridx := len(watchArgs)-1, len(rawArgs)-1
  198. for ; widx >= 0; widx-- {
  199. if watchArgs[widx] == rawArgs[ridx] {
  200. ridx--
  201. continue
  202. }
  203. // watchArgs has extra:
  204. // ETCDCTL_WATCH_KEY=foo watch foo -- echo 1
  205. // watchArgs: foo echo 1
  206. if ridx == execIdx {
  207. return nil, nil, errBadArgsNumConflictEnv
  208. }
  209. }
  210. }
  211. // check conflicting arguments
  212. // e.g. "watch --rev 1 -- echo Hello World" has no conflict
  213. if !execExist && len(watchArgs) > 0 && envKey != "" {
  214. // "ETCDCTL_WATCH_KEY=foo watch foo" should error
  215. // (watchArgs==["foo"])
  216. return nil, nil, errBadArgsNumConflictEnv
  217. }
  218. } else {
  219. for execIdx = range watchArgs {
  220. if watchArgs[execIdx] == "--" {
  221. execExist = true
  222. break
  223. }
  224. }
  225. if execExist && execIdx == len(watchArgs)-1 {
  226. // "watch foo bar --" should error
  227. watchPrefix, watchRev, watchPrevKey = false, 0, false
  228. return nil, nil, errBadArgsNumSeparator
  229. }
  230. flagset := NewWatchCommand().Flags()
  231. if err := flagset.Parse(watchArgs); err != nil {
  232. watchPrefix, watchRev, watchPrevKey = false, 0, false
  233. return nil, nil, err
  234. }
  235. pArgs := flagset.Args()
  236. // "watch" with no argument should error
  237. if !execExist && envKey == "" && len(pArgs) < 1 {
  238. watchPrefix, watchRev, watchPrevKey = false, 0, false
  239. return nil, nil, errBadArgsNum
  240. }
  241. // check conflicting arguments
  242. // e.g. "watch --rev 1 -- echo Hello World" has no conflict
  243. if !execExist && len(pArgs) > 0 && envKey != "" {
  244. // "ETCDCTL_WATCH_KEY=foo watch foo" should error
  245. // (watchArgs==["foo"])
  246. watchPrefix, watchRev, watchPrevKey = false, 0, false
  247. return nil, nil, errBadArgsNumConflictEnv
  248. }
  249. }
  250. argsWithSep := rawArgs
  251. if interactive {
  252. // interactive mode directly passes "--" to the command args
  253. argsWithSep = watchArgs
  254. }
  255. idx, foundSep := 0, false
  256. for idx = range argsWithSep {
  257. if argsWithSep[idx] == "--" {
  258. foundSep = true
  259. break
  260. }
  261. }
  262. if foundSep {
  263. execArgs = argsWithSep[idx+1:]
  264. }
  265. if interactive {
  266. flagset := NewWatchCommand().Flags()
  267. if err := flagset.Parse(argsWithSep); err != nil {
  268. return nil, nil, err
  269. }
  270. watchArgs = flagset.Args()
  271. watchPrefix, err = flagset.GetBool("prefix")
  272. if err != nil {
  273. return nil, nil, err
  274. }
  275. watchRev, err = flagset.GetInt64("rev")
  276. if err != nil {
  277. return nil, nil, err
  278. }
  279. watchPrevKey, err = flagset.GetBool("prev-kv")
  280. if err != nil {
  281. return nil, nil, err
  282. }
  283. }
  284. // "ETCDCTL_WATCH_KEY=foo watch -- echo hello"
  285. // should translate "watch foo -- echo hello"
  286. // (watchArgs=["echo","hello"] should be ["foo","echo","hello"])
  287. if envKey != "" {
  288. ranges := []string{envKey}
  289. if envRange != "" {
  290. ranges = append(ranges, envRange)
  291. }
  292. watchArgs = append(ranges, watchArgs...)
  293. }
  294. if !foundSep {
  295. return watchArgs, nil, nil
  296. }
  297. // "watch foo bar --rev 1 -- echo hello" or "watch foo --rev 1 bar -- echo hello",
  298. // then "watchArgs" is "foo bar echo hello"
  299. // so need ignore args after "argsWithSep[idx]", which is "--"
  300. endIdx := 0
  301. for endIdx = len(watchArgs) - 1; endIdx >= 0; endIdx-- {
  302. if watchArgs[endIdx] == argsWithSep[idx+1] {
  303. break
  304. }
  305. }
  306. watchArgs = watchArgs[:endIdx]
  307. return watchArgs, execArgs, nil
  308. }