main.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "context"
  17. "flag"
  18. "fmt"
  19. "net/http"
  20. "net/url"
  21. "os"
  22. "os/signal"
  23. "syscall"
  24. "time"
  25. "github.com/coreos/etcd/pkg/transport"
  26. "google.golang.org/grpc/grpclog"
  27. )
  28. var from string
  29. var to string
  30. var httpPort int
  31. var verbose bool
  32. func main() {
  33. // TODO: support TLS
  34. flag.StringVar(&from, "from", "localhost:23790", "Address URL to proxy from.")
  35. flag.StringVar(&to, "to", "localhost:2379", "Address URL to forward.")
  36. flag.IntVar(&httpPort, "http-port", 2378, "Port to serve etcd-test-proxy API.")
  37. flag.BoolVar(&verbose, "verbose", false, "'true' to run proxy in verbose mode.")
  38. flag.Usage = func() {
  39. fmt.Fprintf(os.Stderr, "Usage of %q:\n", os.Args[0])
  40. fmt.Fprintln(os.Stderr, `
  41. etcd-test-proxy simulates various network conditions for etcd testing purposes.
  42. See README.md for more examples.
  43. Example:
  44. # build etcd
  45. $ ./build
  46. $ ./bin/etcd
  47. # build etcd-test-proxy
  48. $ make build-etcd-test-proxy -f ./hack/scripts-dev/Makefile
  49. # to test etcd with proxy layer
  50. $ ./bin/etcd-test-proxy --help
  51. $ ./bin/etcd-test-proxy --from localhost:23790 --to localhost:2379 --http-port 2378 --verbose
  52. $ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:2379 put foo bar
  53. $ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
  54. flag.PrintDefaults()
  55. }
  56. flag.Parse()
  57. cfg := transport.ProxyConfig{
  58. From: url.URL{Scheme: "tcp", Host: from},
  59. To: url.URL{Scheme: "tcp", Host: to},
  60. }
  61. if verbose {
  62. cfg.Logger = grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 5)
  63. }
  64. p := transport.NewProxy(cfg)
  65. <-p.Ready()
  66. defer p.Close()
  67. mux := http.NewServeMux()
  68. mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
  69. w.Write([]byte(fmt.Sprintf("proxying [%s -> %s]\n", p.From(), p.To())))
  70. })
  71. mux.HandleFunc("/delay-tx", func(w http.ResponseWriter, req *http.Request) {
  72. switch req.Method {
  73. case http.MethodGet:
  74. w.Write([]byte(fmt.Sprintf("current send latency %v\n", p.LatencyTx())))
  75. case http.MethodPut, http.MethodPost:
  76. if err := req.ParseForm(); err != nil {
  77. w.Write([]byte(fmt.Sprintf("wrong form %q\n", err.Error())))
  78. return
  79. }
  80. lat, err := time.ParseDuration(req.PostForm.Get("latency"))
  81. if err != nil {
  82. w.Write([]byte(fmt.Sprintf("wrong latency form %q\n", err.Error())))
  83. return
  84. }
  85. rv, err := time.ParseDuration(req.PostForm.Get("random-variable"))
  86. if err != nil {
  87. w.Write([]byte(fmt.Sprintf("wrong random-variable form %q\n", err.Error())))
  88. return
  89. }
  90. p.DelayTx(lat, rv)
  91. w.Write([]byte(fmt.Sprintf("added send latency %v±%v (current latency %v)\n", lat, rv, p.LatencyTx())))
  92. case http.MethodDelete:
  93. lat := p.LatencyTx()
  94. p.UndelayTx()
  95. w.Write([]byte(fmt.Sprintf("removed latency %v\n", lat)))
  96. default:
  97. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  98. }
  99. })
  100. mux.HandleFunc("/delay-rx", func(w http.ResponseWriter, req *http.Request) {
  101. switch req.Method {
  102. case http.MethodGet:
  103. w.Write([]byte(fmt.Sprintf("current receive latency %v\n", p.LatencyRx())))
  104. case http.MethodPut, http.MethodPost:
  105. if err := req.ParseForm(); err != nil {
  106. w.Write([]byte(fmt.Sprintf("wrong form %q\n", err.Error())))
  107. return
  108. }
  109. lat, err := time.ParseDuration(req.PostForm.Get("latency"))
  110. if err != nil {
  111. w.Write([]byte(fmt.Sprintf("wrong latency form %q\n", err.Error())))
  112. return
  113. }
  114. rv, err := time.ParseDuration(req.PostForm.Get("random-variable"))
  115. if err != nil {
  116. w.Write([]byte(fmt.Sprintf("wrong random-variable form %q\n", err.Error())))
  117. return
  118. }
  119. p.DelayRx(lat, rv)
  120. w.Write([]byte(fmt.Sprintf("added receive latency %v±%v (current latency %v)\n", lat, rv, p.LatencyRx())))
  121. case http.MethodDelete:
  122. lat := p.LatencyRx()
  123. p.UndelayRx()
  124. w.Write([]byte(fmt.Sprintf("removed latency %v\n", lat)))
  125. default:
  126. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  127. }
  128. })
  129. mux.HandleFunc("/pause-tx", func(w http.ResponseWriter, req *http.Request) {
  130. switch req.Method {
  131. case http.MethodPut, http.MethodPost:
  132. p.PauseTx()
  133. w.Write([]byte(fmt.Sprintf("paused forwarding [%s -> %s]\n", p.From(), p.To())))
  134. case http.MethodDelete:
  135. p.UnpauseTx()
  136. w.Write([]byte(fmt.Sprintf("unpaused forwarding [%s -> %s]\n", p.From(), p.To())))
  137. default:
  138. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  139. }
  140. })
  141. mux.HandleFunc("/pause-rx", func(w http.ResponseWriter, req *http.Request) {
  142. switch req.Method {
  143. case http.MethodPut, http.MethodPost:
  144. p.PauseRx()
  145. w.Write([]byte(fmt.Sprintf("paused forwarding [%s <- %s]\n", p.From(), p.To())))
  146. case http.MethodDelete:
  147. p.UnpauseRx()
  148. w.Write([]byte(fmt.Sprintf("unpaused forwarding [%s <- %s]\n", p.From(), p.To())))
  149. default:
  150. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  151. }
  152. })
  153. mux.HandleFunc("/blackhole-tx", func(w http.ResponseWriter, req *http.Request) {
  154. switch req.Method {
  155. case http.MethodPut, http.MethodPost:
  156. p.BlackholeTx()
  157. w.Write([]byte(fmt.Sprintf("blackholed; dropping packets [%s -> %s]\n", p.From(), p.To())))
  158. case http.MethodDelete:
  159. p.UnblackholeTx()
  160. w.Write([]byte(fmt.Sprintf("unblackholed; restart forwarding [%s -> %s]\n", p.From(), p.To())))
  161. default:
  162. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  163. }
  164. })
  165. mux.HandleFunc("/blackhole-rx", func(w http.ResponseWriter, req *http.Request) {
  166. switch req.Method {
  167. case http.MethodPut, http.MethodPost:
  168. p.BlackholeRx()
  169. w.Write([]byte(fmt.Sprintf("blackholed; dropping packets [%s <- %s]\n", p.From(), p.To())))
  170. case http.MethodDelete:
  171. p.UnblackholeRx()
  172. w.Write([]byte(fmt.Sprintf("unblackholed; restart forwarding [%s <- %s]\n", p.From(), p.To())))
  173. default:
  174. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  175. }
  176. })
  177. srv := &http.Server{
  178. Addr: fmt.Sprintf(":%d", httpPort),
  179. Handler: mux,
  180. }
  181. defer srv.Close()
  182. sig := make(chan os.Signal, 1)
  183. signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
  184. defer signal.Stop(sig)
  185. go func() {
  186. s := <-sig
  187. fmt.Printf("\n\nreceived signal %q, shutting down HTTP server\n\n", s)
  188. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  189. err := srv.Shutdown(ctx)
  190. cancel()
  191. fmt.Printf("gracefully stopped HTTP server with %v\n\n", err)
  192. os.Exit(0)
  193. }()
  194. fmt.Printf("\nserving HTTP server http://localhost:%d\n\n", httpPort)
  195. err := srv.ListenAndServe()
  196. fmt.Printf("HTTP server exit with error %v\n", err)
  197. }