cluster_health.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package command
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "os/signal"
  9. "sort"
  10. "time"
  11. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
  12. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  13. )
  14. func NewClusterHealthCommand() cli.Command {
  15. return cli.Command{
  16. Name: "cluster-health",
  17. Usage: "check the health of the etcd cluster",
  18. Flags: []cli.Flag{
  19. cli.BoolFlag{Name: "forever", Usage: "forever check the health every 10 second until CTRL+C"},
  20. },
  21. Action: handleClusterHealth,
  22. }
  23. }
  24. func handleClusterHealth(c *cli.Context) {
  25. forever := c.Bool("forever")
  26. if forever {
  27. sigch := make(chan os.Signal, 1)
  28. signal.Notify(sigch, os.Interrupt)
  29. go func() {
  30. <-sigch
  31. os.Exit(0)
  32. }()
  33. }
  34. tr, err := getTransport(c)
  35. if err != nil {
  36. handleError(ExitServerError, err)
  37. }
  38. // TODO: update members when forever is set.
  39. mi := mustNewMembersAPI(c)
  40. ms, err := mi.List(context.TODO())
  41. if err != nil {
  42. fmt.Println("cluster may be unhealthy: failed to list members")
  43. handleError(ExitServerError, err)
  44. }
  45. cl := make([]string, 0)
  46. for _, m := range ms {
  47. cl = append(cl, m.ClientURLs...)
  48. }
  49. for {
  50. // check the /health endpoint of all members first
  51. ep, rs0, err := getLeaderStatus(tr, cl)
  52. if err != nil {
  53. fmt.Println("cluster may be unhealthy: failed to connect", cl)
  54. if forever {
  55. time.Sleep(10 * time.Second)
  56. continue
  57. }
  58. os.Exit(1)
  59. }
  60. time.Sleep(time.Second)
  61. // are all the members makeing progress?
  62. _, rs1, err := getLeaderStatus(tr, []string{ep})
  63. if err != nil {
  64. fmt.Println("cluster is unhealthy")
  65. if forever {
  66. time.Sleep(10 * time.Second)
  67. continue
  68. }
  69. os.Exit(1)
  70. }
  71. if rs1.Commit > rs0.Commit {
  72. fmt.Printf("cluster is healthy: raft is making progress [commit index: %v->%v]\n", rs0.Commit, rs1.Commit)
  73. } else {
  74. fmt.Printf("cluster is unhealthy: raft is not making progress [commit index: %v]\n", rs0.Commit)
  75. }
  76. fmt.Printf("leader is %v\n", rs0.Lead)
  77. var prints []string
  78. for id, pr0 := range rs0.Progress {
  79. pr1, ok := rs1.Progress[id]
  80. if !ok {
  81. // TODO: forever should handle configuration change.
  82. fmt.Println("Cluster configuration changed during health checking. Please retry.")
  83. os.Exit(1)
  84. }
  85. if pr1.Match <= pr0.Match {
  86. prints = append(prints, fmt.Sprintf("member %s is unhealthy: raft is not making progress [match: %v->%v]\n", id, pr0.Match, pr1.Match))
  87. } else {
  88. prints = append(prints, fmt.Sprintf("member %s is healthy: raft is making progress [match: %v->%v]\n", id, pr0.Match, pr1.Match))
  89. }
  90. }
  91. sort.Strings(prints)
  92. for _, p := range prints {
  93. fmt.Print(p)
  94. }
  95. if !forever {
  96. return
  97. }
  98. time.Sleep(10 * time.Second)
  99. }
  100. }
  101. type raftStatus struct {
  102. ID string `json:"id"`
  103. Term uint64 `json:"term"`
  104. Vote string `json:"vote"`
  105. Commit uint64 `json:"commit"`
  106. Lead string `json:"lead"`
  107. RaftState string `json:"raftState"`
  108. Progress map[string]struct {
  109. Match uint64 `json:"match"`
  110. Next uint64 `json:"next"`
  111. State string `json:"state"`
  112. } `json:"progress"`
  113. }
  114. type vars struct {
  115. RaftStatus raftStatus `json:"raft.status"`
  116. }
  117. func getLeaderStatus(tr *http.Transport, endpoints []string) (string, raftStatus, error) {
  118. // TODO: use new etcd client
  119. httpclient := http.Client{
  120. Transport: tr,
  121. }
  122. for _, ep := range endpoints {
  123. resp, err := httpclient.Get(ep + "/debug/vars")
  124. if err != nil {
  125. continue
  126. }
  127. defer resp.Body.Close()
  128. if resp.StatusCode != http.StatusOK {
  129. continue
  130. }
  131. vs := &vars{}
  132. d := json.NewDecoder(resp.Body)
  133. err = d.Decode(vs)
  134. if err != nil {
  135. continue
  136. }
  137. if vs.RaftStatus.Lead != vs.RaftStatus.ID {
  138. continue
  139. }
  140. return ep, vs.RaftStatus, nil
  141. }
  142. return "", raftStatus{}, errors.New("no leader")
  143. }