main.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // etcd-proxy is a proxy layer that simulates various network conditions.
  15. package main
  16. import (
  17. "context"
  18. "flag"
  19. "fmt"
  20. "io/ioutil"
  21. "log"
  22. "net/http"
  23. "net/url"
  24. "os"
  25. "os/signal"
  26. "syscall"
  27. "time"
  28. "go.etcd.io/etcd/pkg/proxy"
  29. "go.uber.org/zap"
  30. )
  31. var from string
  32. var to string
  33. var httpPort int
  34. var verbose bool
  35. func main() {
  36. // TODO: support TLS
  37. flag.StringVar(&from, "from", "localhost:23790", "Address URL to proxy from.")
  38. flag.StringVar(&to, "to", "localhost:2379", "Address URL to forward.")
  39. flag.IntVar(&httpPort, "http-port", 2378, "Port to serve etcd-proxy API.")
  40. flag.BoolVar(&verbose, "verbose", false, "'true' to run proxy in verbose mode.")
  41. flag.Usage = func() {
  42. fmt.Fprintf(os.Stderr, "Usage of %q:\n", os.Args[0])
  43. fmt.Fprintln(os.Stderr, `
  44. etcd-proxy simulates various network conditions for etcd testing purposes.
  45. See README.md for more examples.
  46. Example:
  47. # build etcd
  48. $ ./build
  49. $ ./bin/etcd
  50. # build etcd-proxy
  51. $ make build-etcd-proxy
  52. # to test etcd with proxy layer
  53. $ ./bin/etcd-proxy --help
  54. $ ./bin/etcd-proxy --from localhost:23790 --to localhost:2379 --http-port 2378 --verbose
  55. $ ./bin/etcdctl --endpoints localhost:2379 put foo bar
  56. $ ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
  57. flag.PrintDefaults()
  58. }
  59. flag.Parse()
  60. cfg := proxy.ServerConfig{
  61. From: url.URL{Scheme: "tcp", Host: from},
  62. To: url.URL{Scheme: "tcp", Host: to},
  63. }
  64. if verbose {
  65. cfg.Logger = zap.NewExample()
  66. }
  67. p := proxy.NewServer(cfg)
  68. <-p.Ready()
  69. defer p.Close()
  70. mux := http.NewServeMux()
  71. mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
  72. w.Write([]byte(fmt.Sprintf("proxying [%s -> %s]\n", p.From(), p.To())))
  73. })
  74. mux.HandleFunc("/delay-tx", func(w http.ResponseWriter, req *http.Request) {
  75. switch req.Method {
  76. case http.MethodGet:
  77. w.Write([]byte(fmt.Sprintf("current send latency %v\n", p.LatencyTx())))
  78. case http.MethodPut, http.MethodPost:
  79. if err := req.ParseForm(); err != nil {
  80. w.Write([]byte(fmt.Sprintf("wrong form %q\n", err.Error())))
  81. return
  82. }
  83. lat, err := time.ParseDuration(req.PostForm.Get("latency"))
  84. if err != nil {
  85. w.Write([]byte(fmt.Sprintf("wrong latency form %q\n", err.Error())))
  86. return
  87. }
  88. rv, err := time.ParseDuration(req.PostForm.Get("random-variable"))
  89. if err != nil {
  90. w.Write([]byte(fmt.Sprintf("wrong random-variable form %q\n", err.Error())))
  91. return
  92. }
  93. p.DelayTx(lat, rv)
  94. w.Write([]byte(fmt.Sprintf("added send latency %v±%v (current latency %v)\n", lat, rv, p.LatencyTx())))
  95. case http.MethodDelete:
  96. lat := p.LatencyTx()
  97. p.UndelayTx()
  98. w.Write([]byte(fmt.Sprintf("removed latency %v\n", lat)))
  99. default:
  100. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  101. }
  102. })
  103. mux.HandleFunc("/delay-rx", func(w http.ResponseWriter, req *http.Request) {
  104. switch req.Method {
  105. case http.MethodGet:
  106. w.Write([]byte(fmt.Sprintf("current receive latency %v\n", p.LatencyRx())))
  107. case http.MethodPut, http.MethodPost:
  108. if err := req.ParseForm(); err != nil {
  109. w.Write([]byte(fmt.Sprintf("wrong form %q\n", err.Error())))
  110. return
  111. }
  112. lat, err := time.ParseDuration(req.PostForm.Get("latency"))
  113. if err != nil {
  114. w.Write([]byte(fmt.Sprintf("wrong latency form %q\n", err.Error())))
  115. return
  116. }
  117. rv, err := time.ParseDuration(req.PostForm.Get("random-variable"))
  118. if err != nil {
  119. w.Write([]byte(fmt.Sprintf("wrong random-variable form %q\n", err.Error())))
  120. return
  121. }
  122. p.DelayRx(lat, rv)
  123. w.Write([]byte(fmt.Sprintf("added receive latency %v±%v (current latency %v)\n", lat, rv, p.LatencyRx())))
  124. case http.MethodDelete:
  125. lat := p.LatencyRx()
  126. p.UndelayRx()
  127. w.Write([]byte(fmt.Sprintf("removed latency %v\n", lat)))
  128. default:
  129. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  130. }
  131. })
  132. mux.HandleFunc("/pause-tx", func(w http.ResponseWriter, req *http.Request) {
  133. switch req.Method {
  134. case http.MethodPut, http.MethodPost:
  135. p.PauseTx()
  136. w.Write([]byte(fmt.Sprintf("paused forwarding [%s -> %s]\n", p.From(), p.To())))
  137. case http.MethodDelete:
  138. p.UnpauseTx()
  139. w.Write([]byte(fmt.Sprintf("unpaused forwarding [%s -> %s]\n", p.From(), p.To())))
  140. default:
  141. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  142. }
  143. })
  144. mux.HandleFunc("/pause-rx", func(w http.ResponseWriter, req *http.Request) {
  145. switch req.Method {
  146. case http.MethodPut, http.MethodPost:
  147. p.PauseRx()
  148. w.Write([]byte(fmt.Sprintf("paused forwarding [%s <- %s]\n", p.From(), p.To())))
  149. case http.MethodDelete:
  150. p.UnpauseRx()
  151. w.Write([]byte(fmt.Sprintf("unpaused forwarding [%s <- %s]\n", p.From(), p.To())))
  152. default:
  153. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  154. }
  155. })
  156. mux.HandleFunc("/blackhole-tx", func(w http.ResponseWriter, req *http.Request) {
  157. switch req.Method {
  158. case http.MethodPut, http.MethodPost:
  159. p.BlackholeTx()
  160. w.Write([]byte(fmt.Sprintf("blackholed; dropping packets [%s -> %s]\n", p.From(), p.To())))
  161. case http.MethodDelete:
  162. p.UnblackholeTx()
  163. w.Write([]byte(fmt.Sprintf("unblackholed; restart forwarding [%s -> %s]\n", p.From(), p.To())))
  164. default:
  165. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  166. }
  167. })
  168. mux.HandleFunc("/blackhole-rx", func(w http.ResponseWriter, req *http.Request) {
  169. switch req.Method {
  170. case http.MethodPut, http.MethodPost:
  171. p.BlackholeRx()
  172. w.Write([]byte(fmt.Sprintf("blackholed; dropping packets [%s <- %s]\n", p.From(), p.To())))
  173. case http.MethodDelete:
  174. p.UnblackholeRx()
  175. w.Write([]byte(fmt.Sprintf("unblackholed; restart forwarding [%s <- %s]\n", p.From(), p.To())))
  176. default:
  177. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  178. }
  179. })
  180. srv := &http.Server{
  181. Addr: fmt.Sprintf(":%d", httpPort),
  182. Handler: mux,
  183. ErrorLog: log.New(ioutil.Discard, "net/http", 0),
  184. }
  185. defer srv.Close()
  186. sig := make(chan os.Signal, 1)
  187. signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
  188. defer signal.Stop(sig)
  189. go func() {
  190. s := <-sig
  191. fmt.Printf("\n\nreceived signal %q, shutting down HTTP server\n\n", s)
  192. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  193. err := srv.Shutdown(ctx)
  194. cancel()
  195. fmt.Printf("gracefully stopped HTTP server with %v\n\n", err)
  196. os.Exit(0)
  197. }()
  198. fmt.Printf("\nserving HTTP server http://localhost:%d\n\n", httpPort)
  199. err := srv.ListenAndServe()
  200. fmt.Printf("HTTP server exit with error %v\n", err)
  201. }