watch_command.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "bufio"
  17. "context"
  18. "errors"
  19. "fmt"
  20. "os"
  21. "os/exec"
  22. "strings"
  23. "github.com/coreos/etcd/clientv3"
  24. "github.com/spf13/cobra"
  25. )
  26. var (
  27. errBadArgsNum = errors.New("bad number of arguments")
  28. errBadArgsNumConflictEnv = errors.New("bad number of arguments (found conflicting environment key)")
  29. errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)")
  30. errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls")
  31. )
  32. var (
  33. watchRev int64
  34. watchPrefix bool
  35. watchInteractive bool
  36. watchPrevKey bool
  37. )
  38. // NewWatchCommand returns the cobra command for "watch".
  39. func NewWatchCommand() *cobra.Command {
  40. cmd := &cobra.Command{
  41. Use: "watch [options] [key or prefix] [range_end] [--] [exec-command arg1 arg2 ...]",
  42. Short: "Watches events stream on keys or prefixes",
  43. Run: watchCommandFunc,
  44. }
  45. cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
  46. cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
  47. cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
  48. cmd.Flags().BoolVar(&watchPrevKey, "prev-kv", false, "get the previous key-value pair before the event happens")
  49. return cmd
  50. }
  51. // watchCommandFunc executes the "watch" command.
  52. func watchCommandFunc(cmd *cobra.Command, args []string) {
  53. envKey, envRange := os.Getenv("ETCDCTL_WATCH_KEY"), os.Getenv("ETCDCTL_WATCH_RANGE_END")
  54. if envKey == "" && envRange != "" {
  55. ExitWithError(ExitBadArgs, fmt.Errorf("ETCDCTL_WATCH_KEY is empty but got ETCDCTL_WATCH_RANGE_END=%q", envRange))
  56. }
  57. if watchInteractive {
  58. watchInteractiveFunc(cmd, os.Args, envKey, envRange)
  59. return
  60. }
  61. watchArgs, execArgs, err := parseWatchArgs(os.Args, args, envKey, envRange, false)
  62. if err != nil {
  63. ExitWithError(ExitBadArgs, err)
  64. }
  65. c := mustClientFromCmd(cmd)
  66. wc, err := getWatchChan(c, watchArgs)
  67. if err != nil {
  68. ExitWithError(ExitBadArgs, err)
  69. }
  70. printWatchCh(c, wc, execArgs)
  71. if err = c.Close(); err != nil {
  72. ExitWithError(ExitBadConnection, err)
  73. }
  74. ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
  75. }
  76. func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange string) {
  77. c := mustClientFromCmd(cmd)
  78. reader := bufio.NewReader(os.Stdin)
  79. for {
  80. l, err := reader.ReadString('\n')
  81. if err != nil {
  82. ExitWithError(ExitInvalidInput, fmt.Errorf("Error reading watch request line: %v", err))
  83. }
  84. l = strings.TrimSuffix(l, "\n")
  85. args := argify(l)
  86. switch args[0] {
  87. case "watch":
  88. if len(args) < 2 && envKey == "" {
  89. fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
  90. continue
  91. }
  92. watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
  93. if perr != nil {
  94. ExitWithError(ExitBadArgs, perr)
  95. }
  96. ch, err := getWatchChan(c, watchArgs)
  97. if err != nil {
  98. fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
  99. continue
  100. }
  101. go printWatchCh(c, ch, execArgs)
  102. case "progress":
  103. err := c.RequestProgress(clientv3.WithRequireLeader(context.Background()))
  104. if err != nil {
  105. ExitWithError(ExitError, err)
  106. }
  107. default:
  108. fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l)
  109. continue
  110. }
  111. }
  112. }
  113. func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
  114. if len(args) < 1 {
  115. return nil, errBadArgsNum
  116. }
  117. key := args[0]
  118. opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
  119. if len(args) == 2 {
  120. if watchPrefix {
  121. return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
  122. }
  123. opts = append(opts, clientv3.WithRange(args[1]))
  124. }
  125. if watchPrefix {
  126. opts = append(opts, clientv3.WithPrefix())
  127. }
  128. if watchPrevKey {
  129. opts = append(opts, clientv3.WithPrevKV())
  130. }
  131. return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
  132. }
  133. func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string) {
  134. for resp := range ch {
  135. if resp.Canceled {
  136. fmt.Fprintf(os.Stderr, "watch was canceled (%v)\n", resp.Err())
  137. }
  138. if resp.IsProgressNotify() {
  139. fmt.Fprintf(os.Stdout, "progress notify: %d\n", resp.Header.Revision)
  140. }
  141. display.Watch(resp)
  142. if len(execArgs) > 0 {
  143. for _, ev := range resp.Events {
  144. cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
  145. cmd.Env = os.Environ()
  146. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_REVISION=%d", resp.Header.Revision))
  147. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_EVENT_TYPE=%q", ev.Type))
  148. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_KEY=%q", ev.Kv.Key))
  149. cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_VALUE=%q", ev.Kv.Value))
  150. cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
  151. if err := cmd.Run(); err != nil {
  152. fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
  153. os.Exit(1)
  154. }
  155. }
  156. }
  157. }
  158. }
  159. // "commandArgs" is the command arguments after "spf13/cobra" parses
  160. // all "watch" command flags, strips out special characters (e.g. "--").
  161. // "orArgs" is the raw arguments passed to "watch" command
  162. // (e.g. ./bin/etcdctl watch foo --rev 1 bar).
  163. // "--" characters are invalid arguments for "spf13/cobra" library,
  164. // so no need to handle such cases.
  165. func parseWatchArgs(osArgs, commandArgs []string, envKey, envRange string, interactive bool) (watchArgs []string, execArgs []string, err error) {
  166. rawArgs := make([]string, len(osArgs))
  167. copy(rawArgs, osArgs)
  168. watchArgs = make([]string, len(commandArgs))
  169. copy(watchArgs, commandArgs)
  170. // remove preceding commands (e.g. ./bin/etcdctl watch)
  171. // handle "./bin/etcdctl watch foo -- echo watch event"
  172. for idx := range rawArgs {
  173. if rawArgs[idx] == "watch" {
  174. rawArgs = rawArgs[idx+1:]
  175. break
  176. }
  177. }
  178. // remove preceding commands (e.g. "watch foo bar" in interactive mode)
  179. // handle "./bin/etcdctl watch foo -- echo watch event"
  180. if interactive {
  181. if watchArgs[0] != "watch" {
  182. // "watch" not found
  183. watchPrefix, watchRev, watchPrevKey = false, 0, false
  184. return nil, nil, errBadArgsInteractiveWatch
  185. }
  186. watchArgs = watchArgs[1:]
  187. }
  188. execIdx, execExist := 0, false
  189. if !interactive {
  190. for execIdx = range rawArgs {
  191. if rawArgs[execIdx] == "--" {
  192. execExist = true
  193. break
  194. }
  195. }
  196. if execExist && execIdx == len(rawArgs)-1 {
  197. // "watch foo bar --" should error
  198. return nil, nil, errBadArgsNumSeparator
  199. }
  200. // "watch" with no argument should error
  201. if !execExist && len(rawArgs) < 1 && envKey == "" {
  202. return nil, nil, errBadArgsNum
  203. }
  204. if execExist && envKey != "" {
  205. // "ETCDCTL_WATCH_KEY=foo watch foo -- echo 1" should error
  206. // (watchArgs==["foo","echo","1"])
  207. widx, ridx := len(watchArgs)-1, len(rawArgs)-1
  208. for ; widx >= 0; widx-- {
  209. if watchArgs[widx] == rawArgs[ridx] {
  210. ridx--
  211. continue
  212. }
  213. // watchArgs has extra:
  214. // ETCDCTL_WATCH_KEY=foo watch foo -- echo 1
  215. // watchArgs: foo echo 1
  216. if ridx == execIdx {
  217. return nil, nil, errBadArgsNumConflictEnv
  218. }
  219. }
  220. }
  221. // check conflicting arguments
  222. // e.g. "watch --rev 1 -- echo Hello World" has no conflict
  223. if !execExist && len(watchArgs) > 0 && envKey != "" {
  224. // "ETCDCTL_WATCH_KEY=foo watch foo" should error
  225. // (watchArgs==["foo"])
  226. return nil, nil, errBadArgsNumConflictEnv
  227. }
  228. } else {
  229. for execIdx = range watchArgs {
  230. if watchArgs[execIdx] == "--" {
  231. execExist = true
  232. break
  233. }
  234. }
  235. if execExist && execIdx == len(watchArgs)-1 {
  236. // "watch foo bar --" should error
  237. watchPrefix, watchRev, watchPrevKey = false, 0, false
  238. return nil, nil, errBadArgsNumSeparator
  239. }
  240. flagset := NewWatchCommand().Flags()
  241. if perr := flagset.Parse(watchArgs); perr != nil {
  242. watchPrefix, watchRev, watchPrevKey = false, 0, false
  243. return nil, nil, perr
  244. }
  245. pArgs := flagset.Args()
  246. // "watch" with no argument should error
  247. if !execExist && envKey == "" && len(pArgs) < 1 {
  248. watchPrefix, watchRev, watchPrevKey = false, 0, false
  249. return nil, nil, errBadArgsNum
  250. }
  251. // check conflicting arguments
  252. // e.g. "watch --rev 1 -- echo Hello World" has no conflict
  253. if !execExist && len(pArgs) > 0 && envKey != "" {
  254. // "ETCDCTL_WATCH_KEY=foo watch foo" should error
  255. // (watchArgs==["foo"])
  256. watchPrefix, watchRev, watchPrevKey = false, 0, false
  257. return nil, nil, errBadArgsNumConflictEnv
  258. }
  259. }
  260. argsWithSep := rawArgs
  261. if interactive {
  262. // interactive mode directly passes "--" to the command args
  263. argsWithSep = watchArgs
  264. }
  265. idx, foundSep := 0, false
  266. for idx = range argsWithSep {
  267. if argsWithSep[idx] == "--" {
  268. foundSep = true
  269. break
  270. }
  271. }
  272. if foundSep {
  273. execArgs = argsWithSep[idx+1:]
  274. }
  275. if interactive {
  276. flagset := NewWatchCommand().Flags()
  277. if perr := flagset.Parse(argsWithSep); perr != nil {
  278. return nil, nil, perr
  279. }
  280. watchArgs = flagset.Args()
  281. watchPrefix, err = flagset.GetBool("prefix")
  282. if err != nil {
  283. return nil, nil, err
  284. }
  285. watchRev, err = flagset.GetInt64("rev")
  286. if err != nil {
  287. return nil, nil, err
  288. }
  289. watchPrevKey, err = flagset.GetBool("prev-kv")
  290. if err != nil {
  291. return nil, nil, err
  292. }
  293. }
  294. // "ETCDCTL_WATCH_KEY=foo watch -- echo hello"
  295. // should translate "watch foo -- echo hello"
  296. // (watchArgs=["echo","hello"] should be ["foo","echo","hello"])
  297. if envKey != "" {
  298. ranges := []string{envKey}
  299. if envRange != "" {
  300. ranges = append(ranges, envRange)
  301. }
  302. watchArgs = append(ranges, watchArgs...)
  303. }
  304. if !foundSep {
  305. return watchArgs, nil, nil
  306. }
  307. // "watch foo bar --rev 1 -- echo hello" or "watch foo --rev 1 bar -- echo hello",
  308. // then "watchArgs" is "foo bar echo hello"
  309. // so need ignore args after "argsWithSep[idx]", which is "--"
  310. endIdx := 0
  311. for endIdx = len(watchArgs) - 1; endIdx >= 0; endIdx-- {
  312. if watchArgs[endIdx] == argsWithSep[idx+1] {
  313. break
  314. }
  315. }
  316. watchArgs = watchArgs[:endIdx]
  317. return watchArgs, execArgs, nil
  318. }