watch_command.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "bufio"
  17. "context"
  18. "errors"
  19. "fmt"
  20. "os"
  21. "os/exec"
  22. "strings"
  23. "go.etcd.io/etcd/clientv3"
  24. "github.com/spf13/cobra"
  25. )
  26. var (
  27. errBadArgsNum = errors.New("bad number of arguments")
  28. errBadArgsNumConflictEnv = errors.New("bad number of arguments (found conflicting environment key)")
  29. errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)")
  30. errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls")
  31. )
  32. var (
  33. watchRev int64
  34. watchPrefix bool
  35. watchInteractive bool
  36. watchPrevKey bool
  37. )
  38. // NewWatchCommand returns the cobra command for "watch".
  39. func NewWatchCommand() *cobra.Command {
  40. cmd := &cobra.Command{
  41. Use: "watch [options] [key or prefix] [range_end] [--] [exec-command arg1 arg2 ...]",
  42. Short: "Watches events stream on keys or prefixes",
  43. Run: watchCommandFunc,
  44. }
  45. cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
  46. cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
  47. cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
  48. cmd.Flags().BoolVar(&watchPrevKey, "prev-kv", false, "get the previous key-value pair before the event happens")
  49. return cmd
  50. }
  51. // watchCommandFunc executes the "watch" command.
  52. func watchCommandFunc(cmd *cobra.Command, args []string) {
  53. envKey, envRange := os.Getenv("ETCDCTL_WATCH_KEY"), os.Getenv("ETCDCTL_WATCH_RANGE_END")
  54. if envKey == "" && envRange != "" {
  55. ExitWithError(ExitBadArgs, fmt.Errorf("ETCDCTL_WATCH_KEY is empty but got ETCDCTL_WATCH_RANGE_END=%q", envRange))
  56. }
  57. if watchInteractive {
  58. watchInteractiveFunc(cmd, os.Args, envKey, envRange)
  59. return
  60. }
  61. watchArgs, execArgs, err := parseWatchArgs(os.Args, args, envKey, envRange, false)
  62. if err != nil {
  63. ExitWithError(ExitBadArgs, err)
  64. }
  65. c := mustClientFromCmd(cmd)
  66. wc, err := getWatchChan(c, watchArgs)
  67. if err != nil {
  68. ExitWithError(ExitBadArgs, err)
  69. }
  70. printWatchCh(c, wc, execArgs)
  71. if err = c.Close(); err != nil {
  72. ExitWithError(ExitBadConnection, err)
  73. }
  74. ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
  75. }
  76. func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange string) {
  77. c := mustClientFromCmd(cmd)
  78. reader := bufio.NewReader(os.Stdin)
  79. for {
  80. l, err := reader.ReadString('\n')
  81. if err != nil {
  82. ExitWithError(ExitInvalidInput, fmt.Errorf("Error reading watch request line: %v", err))
  83. }
  84. l = strings.TrimSuffix(l, "\n")
  85. args := argify(l)
  86. if len(args) < 1 {
  87. fmt.Fprintf(os.Stderr, "Invalid command: %s (watch and progress supported)\n", l)
  88. continue
  89. }
  90. switch args[0] {
  91. case "watch":
  92. if len(args) < 2 && envKey == "" {
  93. fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
  94. continue
  95. }
  96. watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
  97. if perr != nil {
  98. ExitWithError(ExitBadArgs, perr)
  99. }
  100. ch, err := getWatchChan(c, watchArgs)
  101. if err != nil {
  102. fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
  103. continue
  104. }
  105. go printWatchCh(c, ch, execArgs)
  106. case "progress":
  107. err := c.RequestProgress(clientv3.WithRequireLeader(context.Background()))
  108. if err != nil {
  109. ExitWithError(ExitError, err)
  110. }
  111. default:
  112. fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l)
  113. continue
  114. }
  115. }
  116. }
  117. func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
  118. if len(args) < 1 {
  119. return nil, errBadArgsNum
  120. }
  121. key := args[0]
  122. opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
  123. if len(args) == 2 {
  124. if watchPrefix {
  125. return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
  126. }
  127. opts = append(opts, clientv3.WithRange(args[1]))
  128. }
  129. if watchPrefix {
  130. opts = append(opts, clientv3.WithPrefix())
  131. }
  132. if watchPrevKey {
  133. opts = append(opts, clientv3.WithPrevKV())
  134. }
  135. return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
  136. }
  137. func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string) {
  138. for resp := range ch {
  139. if resp.Canceled {
  140. fmt.Fprintf(os.Stderr, "watch was canceled (%v)\n", resp.Err())
  141. }
  142. if resp.IsProgressNotify() {
  143. fmt.Fprintf(os.Stdout, "progress notify: %d\n", resp.Header.Revision)
  144. }
  145. display.Watch(resp)
  146. if len(execArgs) > 0 {
  147. for _, ev := range resp.Events {
  148. cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
  149. cmd.Env = os.Environ()
  150. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_REVISION=%d", resp.Header.Revision))
  151. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_EVENT_TYPE=%q", ev.Type))
  152. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_KEY=%q", ev.Kv.Key))
  153. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_VALUE=%q", ev.Kv.Value))
  154. cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
  155. if err := cmd.Run(); err != nil {
  156. fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
  157. os.Exit(1)
  158. }
  159. }
  160. }
  161. }
  162. }
  163. // "commandArgs" is the command arguments after "spf13/cobra" parses
  164. // all "watch" command flags, strips out special characters (e.g. "--").
  165. // "orArgs" is the raw arguments passed to "watch" command
  166. // (e.g. ./bin/etcdctl watch foo --rev 1 bar).
  167. // "--" characters are invalid arguments for "spf13/cobra" library,
  168. // so no need to handle such cases.
  169. func parseWatchArgs(osArgs, commandArgs []string, envKey, envRange string, interactive bool) (watchArgs []string, execArgs []string, err error) {
  170. rawArgs := make([]string, len(osArgs))
  171. copy(rawArgs, osArgs)
  172. watchArgs = make([]string, len(commandArgs))
  173. copy(watchArgs, commandArgs)
  174. // remove preceding commands (e.g. ./bin/etcdctl watch)
  175. // handle "./bin/etcdctl watch foo -- echo watch event"
  176. for idx := range rawArgs {
  177. if rawArgs[idx] == "watch" {
  178. rawArgs = rawArgs[idx+1:]
  179. break
  180. }
  181. }
  182. // remove preceding commands (e.g. "watch foo bar" in interactive mode)
  183. // handle "./bin/etcdctl watch foo -- echo watch event"
  184. if interactive {
  185. if watchArgs[0] != "watch" {
  186. // "watch" not found
  187. watchPrefix, watchRev, watchPrevKey = false, 0, false
  188. return nil, nil, errBadArgsInteractiveWatch
  189. }
  190. watchArgs = watchArgs[1:]
  191. }
  192. execIdx, execExist := 0, false
  193. if !interactive {
  194. for execIdx = range rawArgs {
  195. if rawArgs[execIdx] == "--" {
  196. execExist = true
  197. break
  198. }
  199. }
  200. if execExist && execIdx == len(rawArgs)-1 {
  201. // "watch foo bar --" should error
  202. return nil, nil, errBadArgsNumSeparator
  203. }
  204. // "watch" with no argument should error
  205. if !execExist && len(rawArgs) < 1 && envKey == "" {
  206. return nil, nil, errBadArgsNum
  207. }
  208. if execExist && envKey != "" {
  209. // "ETCDCTL_WATCH_KEY=foo watch foo -- echo 1" should error
  210. // (watchArgs==["foo","echo","1"])
  211. widx, ridx := len(watchArgs)-1, len(rawArgs)-1
  212. for ; widx >= 0; widx-- {
  213. if watchArgs[widx] == rawArgs[ridx] {
  214. ridx--
  215. continue
  216. }
  217. // watchArgs has extra:
  218. // ETCDCTL_WATCH_KEY=foo watch foo -- echo 1
  219. // watchArgs: foo echo 1
  220. if ridx == execIdx {
  221. return nil, nil, errBadArgsNumConflictEnv
  222. }
  223. }
  224. }
  225. // check conflicting arguments
  226. // e.g. "watch --rev 1 -- echo Hello World" has no conflict
  227. if !execExist && len(watchArgs) > 0 && envKey != "" {
  228. // "ETCDCTL_WATCH_KEY=foo watch foo" should error
  229. // (watchArgs==["foo"])
  230. return nil, nil, errBadArgsNumConflictEnv
  231. }
  232. } else {
  233. for execIdx = range watchArgs {
  234. if watchArgs[execIdx] == "--" {
  235. execExist = true
  236. break
  237. }
  238. }
  239. if execExist && execIdx == len(watchArgs)-1 {
  240. // "watch foo bar --" should error
  241. watchPrefix, watchRev, watchPrevKey = false, 0, false
  242. return nil, nil, errBadArgsNumSeparator
  243. }
  244. flagset := NewWatchCommand().Flags()
  245. if perr := flagset.Parse(watchArgs); perr != nil {
  246. watchPrefix, watchRev, watchPrevKey = false, 0, false
  247. return nil, nil, perr
  248. }
  249. pArgs := flagset.Args()
  250. // "watch" with no argument should error
  251. if !execExist && envKey == "" && len(pArgs) < 1 {
  252. watchPrefix, watchRev, watchPrevKey = false, 0, false
  253. return nil, nil, errBadArgsNum
  254. }
  255. // check conflicting arguments
  256. // e.g. "watch --rev 1 -- echo Hello World" has no conflict
  257. if !execExist && len(pArgs) > 0 && envKey != "" {
  258. // "ETCDCTL_WATCH_KEY=foo watch foo" should error
  259. // (watchArgs==["foo"])
  260. watchPrefix, watchRev, watchPrevKey = false, 0, false
  261. return nil, nil, errBadArgsNumConflictEnv
  262. }
  263. }
  264. argsWithSep := rawArgs
  265. if interactive {
  266. // interactive mode directly passes "--" to the command args
  267. argsWithSep = watchArgs
  268. }
  269. idx, foundSep := 0, false
  270. for idx = range argsWithSep {
  271. if argsWithSep[idx] == "--" {
  272. foundSep = true
  273. break
  274. }
  275. }
  276. if foundSep {
  277. execArgs = argsWithSep[idx+1:]
  278. }
  279. if interactive {
  280. flagset := NewWatchCommand().Flags()
  281. if perr := flagset.Parse(argsWithSep); perr != nil {
  282. return nil, nil, perr
  283. }
  284. watchArgs = flagset.Args()
  285. watchPrefix, err = flagset.GetBool("prefix")
  286. if err != nil {
  287. return nil, nil, err
  288. }
  289. watchRev, err = flagset.GetInt64("rev")
  290. if err != nil {
  291. return nil, nil, err
  292. }
  293. watchPrevKey, err = flagset.GetBool("prev-kv")
  294. if err != nil {
  295. return nil, nil, err
  296. }
  297. }
  298. // "ETCDCTL_WATCH_KEY=foo watch -- echo hello"
  299. // should translate "watch foo -- echo hello"
  300. // (watchArgs=["echo","hello"] should be ["foo","echo","hello"])
  301. if envKey != "" {
  302. ranges := []string{envKey}
  303. if envRange != "" {
  304. ranges = append(ranges, envRange)
  305. }
  306. watchArgs = append(ranges, watchArgs...)
  307. }
  308. if !foundSep {
  309. return watchArgs, nil, nil
  310. }
  311. // "watch foo bar --rev 1 -- echo hello" or "watch foo --rev 1 bar -- echo hello",
  312. // then "watchArgs" is "foo bar echo hello"
  313. // so need ignore args after "argsWithSep[idx]", which is "--"
  314. endIdx := 0
  315. for endIdx = len(watchArgs) - 1; endIdx >= 0; endIdx-- {
  316. if watchArgs[endIdx] == argsWithSep[idx+1] {
  317. break
  318. }
  319. }
  320. watchArgs = watchArgs[:endIdx]
  321. return watchArgs, execArgs, nil
  322. }