exec_watch_command.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package command
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "os/exec"
  8. "os/signal"
  9. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
  10. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
  11. )
  12. // NewExecWatchCommand returns the CLI command for "exec-watch".
  13. func NewExecWatchCommand() cli.Command {
  14. return cli.Command{
  15. Name: "exec-watch",
  16. Usage: "watch a key for changes and exec an executable",
  17. Flags: []cli.Flag{
  18. cli.IntFlag{Name: "after-index", Value: 0, Usage: "watch after the given index"},
  19. cli.BoolFlag{Name: "recursive", Usage: "watch all values for key and child keys"},
  20. },
  21. Action: func(c *cli.Context) {
  22. handleKey(c, execWatchCommandFunc)
  23. },
  24. }
  25. }
  26. // execWatchCommandFunc executes the "exec-watch" command.
  27. func execWatchCommandFunc(c *cli.Context, client *etcd.Client) (*etcd.Response, error) {
  28. _ = io.Copy
  29. _ = exec.Command
  30. args := c.Args()
  31. argsLen := len(args)
  32. if argsLen < 2 {
  33. return nil, errors.New("Key and command to exec required")
  34. }
  35. key := args[argsLen-1]
  36. cmdArgs := args[:argsLen-1]
  37. index := 0
  38. if c.Int("after-index") != 0 {
  39. index = c.Int("after-index") + 1
  40. key = args[0]
  41. cmdArgs = args[2:]
  42. }
  43. recursive := c.Bool("recursive")
  44. if recursive != false {
  45. key = args[0]
  46. cmdArgs = args[2:]
  47. }
  48. sigch := make(chan os.Signal, 1)
  49. signal.Notify(sigch, os.Interrupt)
  50. stop := make(chan bool)
  51. go func() {
  52. <-sigch
  53. stop <- true
  54. os.Exit(0)
  55. }()
  56. receiver := make(chan *etcd.Response)
  57. client.SetConsistency(etcd.STRONG_CONSISTENCY)
  58. go client.Watch(key, uint64(index), recursive, receiver, stop)
  59. for {
  60. resp := <-receiver
  61. cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
  62. cmd.Env = environResponse(resp, os.Environ())
  63. stdout, err := cmd.StdoutPipe()
  64. if err != nil {
  65. fmt.Fprintf(os.Stderr, err.Error())
  66. os.Exit(1)
  67. }
  68. stderr, err := cmd.StderrPipe()
  69. if err != nil {
  70. fmt.Fprintf(os.Stderr, err.Error())
  71. os.Exit(1)
  72. }
  73. err = cmd.Start()
  74. if err != nil {
  75. fmt.Fprintf(os.Stderr, err.Error())
  76. os.Exit(1)
  77. }
  78. go io.Copy(os.Stdout, stdout)
  79. go io.Copy(os.Stderr, stderr)
  80. cmd.Wait()
  81. }
  82. }
  83. func environResponse(resp *etcd.Response, env []string) []string {
  84. env = append(env, "ETCD_WATCH_ACTION="+resp.Action)
  85. env = append(env, "ETCD_WATCH_MODIFIED_INDEX="+fmt.Sprintf("%d", resp.Node.ModifiedIndex))
  86. env = append(env, "ETCD_WATCH_KEY="+resp.Node.Key)
  87. env = append(env, "ETCD_WATCH_VALUE="+resp.Node.Value)
  88. return env
  89. }