exec_watch_command.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package command
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "os/exec"
  8. "os/signal"
  9. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
  10. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
  11. )
  12. // NewExecWatchCommand returns the CLI command for "exec-watch".
  13. func NewExecWatchCommand() cli.Command {
  14. return cli.Command{
  15. Name: "exec-watch",
  16. Usage: "watch a key for changes and exec an executable",
  17. Flags: []cli.Flag{
  18. cli.IntFlag{Name: "after-index", Value: 0, Usage: "watch after the given index"},
  19. cli.BoolFlag{Name: "recursive", Usage: "watch all values for key and child keys"},
  20. },
  21. Action: func(c *cli.Context) {
  22. handleKey(c, execWatchCommandFunc)
  23. },
  24. }
  25. }
  26. // execWatchCommandFunc executes the "exec-watch" command.
  27. func execWatchCommandFunc(c *cli.Context, client *etcd.Client) (*etcd.Response, error) {
  28. _ = io.Copy
  29. _ = exec.Command
  30. args := c.Args()
  31. argsLen := len(args)
  32. if argsLen < 2 {
  33. return nil, errors.New("Key and command to exec required")
  34. }
  35. key := args[argsLen-1]
  36. cmdArgs := args[:argsLen-1]
  37. index := 0
  38. if c.Int("after-index") != 0 {
  39. index = c.Int("after-index") + 1
  40. key = args[0]
  41. cmdArgs = args[2:]
  42. }
  43. recursive := c.Bool("recursive")
  44. if recursive != false {
  45. key = args[0]
  46. cmdArgs = args[2:]
  47. }
  48. sigch := make(chan os.Signal, 1)
  49. signal.Notify(sigch, os.Interrupt)
  50. stop := make(chan bool)
  51. go func() {
  52. <-sigch
  53. stop <- true
  54. os.Exit(0)
  55. }()
  56. receiver := make(chan *etcd.Response)
  57. go client.Watch(key, uint64(index), recursive, receiver, stop)
  58. for {
  59. resp := <-receiver
  60. cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
  61. cmd.Env = environResponse(resp, os.Environ())
  62. stdout, err := cmd.StdoutPipe()
  63. if err != nil {
  64. fmt.Fprintf(os.Stderr, err.Error())
  65. os.Exit(1)
  66. }
  67. stderr, err := cmd.StderrPipe()
  68. if err != nil {
  69. fmt.Fprintf(os.Stderr, err.Error())
  70. os.Exit(1)
  71. }
  72. err = cmd.Start()
  73. if err != nil {
  74. fmt.Fprintf(os.Stderr, err.Error())
  75. os.Exit(1)
  76. }
  77. go io.Copy(os.Stdout, stdout)
  78. go io.Copy(os.Stderr, stderr)
  79. cmd.Wait()
  80. }
  81. }
  82. func environResponse(resp *etcd.Response, env []string) []string {
  83. env = append(env, "ETCD_WATCH_ACTION="+resp.Action)
  84. env = append(env, "ETCD_WATCH_MODIFIED_INDEX="+fmt.Sprintf("%d", resp.Node.ModifiedIndex))
  85. env = append(env, "ETCD_WATCH_KEY="+resp.Node.Key)
  86. env = append(env, "ETCD_WATCH_VALUE="+resp.Node.Value)
  87. return env
  88. }