123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package command
- import (
- "bufio"
- "context"
- "errors"
- "fmt"
- "os"
- "os/exec"
- "strings"
- "github.com/coreos/etcd/clientv3"
- "github.com/spf13/cobra"
- )
- var (
- errBadArgsNum = errors.New("bad number of arguments")
- errBadArgsNumConflictEnv = errors.New("bad number of arguments (found conflicting environment key)")
- errBadArgsNumSeparator = errors.New("bad number of arguments (found separator --, but no commands)")
- errBadArgsInteractiveWatch = errors.New("args[0] must be 'watch' for interactive calls")
- )
- var (
- watchRev int64
- watchPrefix bool
- watchInteractive bool
- watchPrevKey bool
- )
- // NewWatchCommand returns the cobra command for "watch".
- func NewWatchCommand() *cobra.Command {
- cmd := &cobra.Command{
- Use: "watch [options] [key or prefix] [range_end] [--] [exec-command arg1 arg2 ...]",
- Short: "Watches events stream on keys or prefixes",
- Run: watchCommandFunc,
- }
- cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
- cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
- cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
- cmd.Flags().BoolVar(&watchPrevKey, "prev-kv", false, "get the previous key-value pair before the event happens")
- return cmd
- }
- // watchCommandFunc executes the "watch" command.
- func watchCommandFunc(cmd *cobra.Command, args []string) {
- envKey, envRange := os.Getenv("ETCDCTL_WATCH_KEY"), os.Getenv("ETCDCTL_WATCH_RANGE_END")
- if envKey == "" && envRange != "" {
- ExitWithError(ExitBadArgs, fmt.Errorf("ETCDCTL_WATCH_KEY is empty but got ETCDCTL_WATCH_RANGE_END=%q", envRange))
- }
- if watchInteractive {
- watchInteractiveFunc(cmd, os.Args, envKey, envRange)
- return
- }
- watchArgs, execArgs, err := parseWatchArgs(os.Args, args, envKey, envRange, false)
- if err != nil {
- ExitWithError(ExitBadArgs, err)
- }
- c := mustClientFromCmd(cmd)
- wc, err := getWatchChan(c, watchArgs)
- if err != nil {
- ExitWithError(ExitBadArgs, err)
- }
- printWatchCh(c, wc, execArgs)
- if err = c.Close(); err != nil {
- ExitWithError(ExitBadConnection, err)
- }
- ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
- }
- func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange string) {
- c := mustClientFromCmd(cmd)
- reader := bufio.NewReader(os.Stdin)
- for {
- l, err := reader.ReadString('\n')
- if err != nil {
- ExitWithError(ExitInvalidInput, fmt.Errorf("Error reading watch request line: %v", err))
- }
- l = strings.TrimSuffix(l, "\n")
- args := argify(l)
- if len(args) < 2 && envKey == "" {
- fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
- continue
- }
- if args[0] != "watch" {
- fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l)
- continue
- }
- watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
- if perr != nil {
- ExitWithError(ExitBadArgs, perr)
- }
- ch, err := getWatchChan(c, watchArgs)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
- continue
- }
- go printWatchCh(c, ch, execArgs)
- }
- }
- func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) {
- if len(args) < 1 {
- return nil, errBadArgsNum
- }
- key := args[0]
- opts := []clientv3.OpOption{clientv3.WithRev(watchRev)}
- if len(args) == 2 {
- if watchPrefix {
- return nil, fmt.Errorf("`range_end` and `--prefix` are mutually exclusive")
- }
- opts = append(opts, clientv3.WithRange(args[1]))
- }
- if watchPrefix {
- opts = append(opts, clientv3.WithPrefix())
- }
- if watchPrevKey {
- opts = append(opts, clientv3.WithPrevKV())
- }
- return c.Watch(clientv3.WithRequireLeader(context.Background()), key, opts...), nil
- }
- func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string) {
- for resp := range ch {
- if resp.Canceled {
- fmt.Fprintf(os.Stderr, "watch was canceled (%v)\n", resp.Err())
- }
- display.Watch(resp)
- if len(execArgs) > 0 {
- for _, ev := range resp.Events {
- cmd := exec.CommandContext(c.Ctx(), execArgs[0], execArgs[1:]...)
- cmd.Env = os.Environ()
- cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_REVISION=%d", resp.Header.Revision))
- cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_EVENT_TYPE=%q", ev.Type))
- cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_KEY=%q", ev.Kv.Key))
- cmd.Env = append(cmd.Env, fmt.Sprintf("ETCD_WATCH_VALUE=%q", ev.Kv.Value))
- cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
- if err := cmd.Run(); err != nil {
- fmt.Fprintf(os.Stderr, "command %q error (%v)\n", execArgs, err)
- os.Exit(1)
- }
- }
- }
- }
- }
- // "commandArgs" is the command arguments after "spf13/cobra" parses
- // all "watch" command flags, strips out special characters (e.g. "--").
- // "orArgs" is the raw arguments passed to "watch" command
- // (e.g. ./bin/etcdctl watch foo --rev 1 bar).
- // "--" characters are invalid arguments for "spf13/cobra" library,
- // so no need to handle such cases.
- func parseWatchArgs(osArgs, commandArgs []string, envKey, envRange string, interactive bool) (watchArgs []string, execArgs []string, err error) {
- rawArgs := make([]string, len(osArgs))
- copy(rawArgs, osArgs)
- watchArgs = make([]string, len(commandArgs))
- copy(watchArgs, commandArgs)
- // remove preceding commands (e.g. ./bin/etcdctl watch)
- // handle "./bin/etcdctl watch foo -- echo watch event"
- for idx := range rawArgs {
- if rawArgs[idx] == "watch" {
- rawArgs = rawArgs[idx+1:]
- break
- }
- }
- // remove preceding commands (e.g. "watch foo bar" in interactive mode)
- // handle "./bin/etcdctl watch foo -- echo watch event"
- if interactive {
- if watchArgs[0] != "watch" {
- // "watch" not found
- watchPrefix, watchRev, watchPrevKey = false, 0, false
- return nil, nil, errBadArgsInteractiveWatch
- }
- watchArgs = watchArgs[1:]
- }
- execIdx, execExist := 0, false
- if !interactive {
- for execIdx = range rawArgs {
- if rawArgs[execIdx] == "--" {
- execExist = true
- break
- }
- }
- if execExist && execIdx == len(rawArgs)-1 {
- // "watch foo bar --" should error
- return nil, nil, errBadArgsNumSeparator
- }
- // "watch" with no argument should error
- if !execExist && len(rawArgs) < 1 && envKey == "" {
- return nil, nil, errBadArgsNum
- }
- if execExist && envKey != "" {
- // "ETCDCTL_WATCH_KEY=foo watch foo -- echo 1" should error
- // (watchArgs==["foo","echo","1"])
- widx, ridx := len(watchArgs)-1, len(rawArgs)-1
- for ; widx >= 0; widx-- {
- if watchArgs[widx] == rawArgs[ridx] {
- ridx--
- continue
- }
- // watchArgs has extra:
- // ETCDCTL_WATCH_KEY=foo watch foo -- echo 1
- // watchArgs: foo echo 1
- if ridx == execIdx {
- return nil, nil, errBadArgsNumConflictEnv
- }
- }
- }
- // check conflicting arguments
- // e.g. "watch --rev 1 -- echo Hello World" has no conflict
- if !execExist && len(watchArgs) > 0 && envKey != "" {
- // "ETCDCTL_WATCH_KEY=foo watch foo" should error
- // (watchArgs==["foo"])
- return nil, nil, errBadArgsNumConflictEnv
- }
- } else {
- for execIdx = range watchArgs {
- if watchArgs[execIdx] == "--" {
- execExist = true
- break
- }
- }
- if execExist && execIdx == len(watchArgs)-1 {
- // "watch foo bar --" should error
- watchPrefix, watchRev, watchPrevKey = false, 0, false
- return nil, nil, errBadArgsNumSeparator
- }
- flagset := NewWatchCommand().Flags()
- if err := flagset.Parse(watchArgs); err != nil {
- watchPrefix, watchRev, watchPrevKey = false, 0, false
- return nil, nil, err
- }
- pArgs := flagset.Args()
- // "watch" with no argument should error
- if !execExist && envKey == "" && len(pArgs) < 1 {
- watchPrefix, watchRev, watchPrevKey = false, 0, false
- return nil, nil, errBadArgsNum
- }
- // check conflicting arguments
- // e.g. "watch --rev 1 -- echo Hello World" has no conflict
- if !execExist && len(pArgs) > 0 && envKey != "" {
- // "ETCDCTL_WATCH_KEY=foo watch foo" should error
- // (watchArgs==["foo"])
- watchPrefix, watchRev, watchPrevKey = false, 0, false
- return nil, nil, errBadArgsNumConflictEnv
- }
- }
- argsWithSep := rawArgs
- if interactive {
- // interactive mode directly passes "--" to the command args
- argsWithSep = watchArgs
- }
- idx, foundSep := 0, false
- for idx = range argsWithSep {
- if argsWithSep[idx] == "--" {
- foundSep = true
- break
- }
- }
- if foundSep {
- execArgs = argsWithSep[idx+1:]
- }
- if interactive {
- flagset := NewWatchCommand().Flags()
- if err := flagset.Parse(argsWithSep); err != nil {
- return nil, nil, err
- }
- watchArgs = flagset.Args()
- watchPrefix, err = flagset.GetBool("prefix")
- if err != nil {
- return nil, nil, err
- }
- watchRev, err = flagset.GetInt64("rev")
- if err != nil {
- return nil, nil, err
- }
- watchPrevKey, err = flagset.GetBool("prev-kv")
- if err != nil {
- return nil, nil, err
- }
- }
- // "ETCDCTL_WATCH_KEY=foo watch -- echo hello"
- // should translate "watch foo -- echo hello"
- // (watchArgs=["echo","hello"] should be ["foo","echo","hello"])
- if envKey != "" {
- ranges := []string{envKey}
- if envRange != "" {
- ranges = append(ranges, envRange)
- }
- watchArgs = append(ranges, watchArgs...)
- }
- if !foundSep {
- return watchArgs, nil, nil
- }
- // "watch foo bar --rev 1 -- echo hello" or "watch foo --rev 1 bar -- echo hello",
- // then "watchArgs" is "foo bar echo hello"
- // so need ignore args after "argsWithSep[idx]", which is "--"
- endIdx := 0
- for endIdx = len(watchArgs) - 1; endIdx >= 0; endIdx-- {
- if watchArgs[endIdx] == argsWithSep[idx+1] {
- break
- }
- }
- watchArgs = watchArgs[:endIdx]
- return watchArgs, execArgs, nil
- }
|