cluster_test.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/etcdserver"
  22. )
  23. const etcdProcessBasePort = 20000
  24. type clientConnType int
  25. const (
  26. clientNonTLS clientConnType = iota
  27. clientTLS
  28. clientTLSAndNonTLS
  29. )
  30. var (
  31. configNoTLS = etcdProcessClusterConfig{
  32. clusterSize: 3,
  33. initialToken: "new",
  34. }
  35. configAutoTLS = etcdProcessClusterConfig{
  36. clusterSize: 3,
  37. isPeerTLS: true,
  38. isPeerAutoTLS: true,
  39. initialToken: "new",
  40. }
  41. configTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. clientTLS: clientTLS,
  44. isPeerTLS: true,
  45. initialToken: "new",
  46. }
  47. configClientTLS = etcdProcessClusterConfig{
  48. clusterSize: 3,
  49. clientTLS: clientTLS,
  50. initialToken: "new",
  51. }
  52. configClientBoth = etcdProcessClusterConfig{
  53. clusterSize: 1,
  54. clientTLS: clientTLSAndNonTLS,
  55. initialToken: "new",
  56. }
  57. configClientAutoTLS = etcdProcessClusterConfig{
  58. clusterSize: 1,
  59. isClientAutoTLS: true,
  60. clientTLS: clientTLS,
  61. initialToken: "new",
  62. }
  63. configPeerTLS = etcdProcessClusterConfig{
  64. clusterSize: 3,
  65. isPeerTLS: true,
  66. initialToken: "new",
  67. }
  68. configClientTLSCertAuth = etcdProcessClusterConfig{
  69. clusterSize: 1,
  70. clientTLS: clientTLS,
  71. initialToken: "new",
  72. clientCertAuthEnabled: true,
  73. }
  74. )
  75. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  76. ret := cfg
  77. ret.clusterSize = 1
  78. return &ret
  79. }
  80. type etcdProcessCluster struct {
  81. cfg *etcdProcessClusterConfig
  82. procs []etcdProcess
  83. }
  84. type etcdProcessClusterConfig struct {
  85. execPath string
  86. dataDirPath string
  87. keepDataDir bool
  88. clusterSize int
  89. baseScheme string
  90. basePort int
  91. metricsURLScheme string
  92. snapCount int // default is 10000
  93. clientTLS clientConnType
  94. clientCertAuthEnabled bool
  95. isPeerTLS bool
  96. isPeerAutoTLS bool
  97. isClientAutoTLS bool
  98. isClientCRL bool
  99. forceNewCluster bool
  100. initialToken string
  101. quotaBackendBytes int64
  102. noStrictReconfig bool
  103. }
  104. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  105. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  106. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  107. etcdCfgs := cfg.etcdServerProcessConfigs()
  108. epc := &etcdProcessCluster{
  109. cfg: cfg,
  110. procs: make([]etcdProcess, cfg.clusterSize),
  111. }
  112. // launch etcd processes
  113. for i := range etcdCfgs {
  114. proc, err := newEtcdProcess(etcdCfgs[i])
  115. if err != nil {
  116. epc.Close()
  117. return nil, err
  118. }
  119. epc.procs[i] = proc
  120. }
  121. if err := epc.Start(); err != nil {
  122. return nil, err
  123. }
  124. return epc, nil
  125. }
  126. func (cfg *etcdProcessClusterConfig) clientScheme() string {
  127. if cfg.clientTLS == clientTLS {
  128. return "https"
  129. }
  130. return "http"
  131. }
  132. func (cfg *etcdProcessClusterConfig) peerScheme() string {
  133. peerScheme := cfg.baseScheme
  134. if peerScheme == "" {
  135. peerScheme = "http"
  136. }
  137. if cfg.isPeerTLS {
  138. peerScheme += "s"
  139. }
  140. return peerScheme
  141. }
  142. func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
  143. if cfg.basePort == 0 {
  144. cfg.basePort = etcdProcessBasePort
  145. }
  146. if cfg.execPath == "" {
  147. cfg.execPath = binPath
  148. }
  149. if cfg.snapCount == 0 {
  150. cfg.snapCount = etcdserver.DefaultSnapCount
  151. }
  152. etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
  153. initialCluster := make([]string, cfg.clusterSize)
  154. for i := 0; i < cfg.clusterSize; i++ {
  155. var curls []string
  156. var curl, curltls string
  157. port := cfg.basePort + 5*i
  158. curlHost := fmt.Sprintf("localhost:%d", port)
  159. switch cfg.clientTLS {
  160. case clientNonTLS, clientTLS:
  161. curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
  162. curls = []string{curl}
  163. case clientTLSAndNonTLS:
  164. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  165. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  166. curls = []string{curl, curltls}
  167. }
  168. purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
  169. name := fmt.Sprintf("testname%d", i)
  170. dataDirPath := cfg.dataDirPath
  171. if cfg.dataDirPath == "" {
  172. var derr error
  173. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  174. if derr != nil {
  175. panic("could not get tempdir for datadir")
  176. }
  177. }
  178. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  179. args := []string{
  180. "--name", name,
  181. "--listen-client-urls", strings.Join(curls, ","),
  182. "--advertise-client-urls", strings.Join(curls, ","),
  183. "--listen-peer-urls", purl.String(),
  184. "--initial-advertise-peer-urls", purl.String(),
  185. "--initial-cluster-token", cfg.initialToken,
  186. "--data-dir", dataDirPath,
  187. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  188. }
  189. args = addV2Args(args)
  190. if cfg.forceNewCluster {
  191. args = append(args, "--force-new-cluster")
  192. }
  193. if cfg.quotaBackendBytes > 0 {
  194. args = append(args,
  195. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  196. )
  197. }
  198. if cfg.noStrictReconfig {
  199. args = append(args, "--strict-reconfig-check=false")
  200. }
  201. var murl string
  202. if cfg.metricsURLScheme != "" {
  203. murl = (&url.URL{
  204. Scheme: cfg.metricsURLScheme,
  205. Host: fmt.Sprintf("localhost:%d", port+2),
  206. }).String()
  207. args = append(args, "--listen-metrics-urls", murl)
  208. }
  209. args = append(args, cfg.tlsArgs()...)
  210. etcdCfgs[i] = &etcdServerProcessConfig{
  211. execPath: cfg.execPath,
  212. args: args,
  213. tlsArgs: cfg.tlsArgs(),
  214. dataDirPath: dataDirPath,
  215. keepDataDir: cfg.keepDataDir,
  216. name: name,
  217. purl: purl,
  218. acurl: curl,
  219. murl: murl,
  220. initialToken: cfg.initialToken,
  221. }
  222. }
  223. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  224. for i := range etcdCfgs {
  225. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  226. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  227. }
  228. return etcdCfgs
  229. }
  230. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  231. if cfg.clientTLS != clientNonTLS {
  232. if cfg.isClientAutoTLS {
  233. args = append(args, "--auto-tls")
  234. } else {
  235. tlsClientArgs := []string{
  236. "--cert-file", certPath,
  237. "--key-file", privateKeyPath,
  238. "--ca-file", caPath,
  239. }
  240. args = append(args, tlsClientArgs...)
  241. if cfg.clientCertAuthEnabled {
  242. args = append(args, "--client-cert-auth")
  243. }
  244. }
  245. }
  246. if cfg.isPeerTLS {
  247. if cfg.isPeerAutoTLS {
  248. args = append(args, "--peer-auto-tls")
  249. } else {
  250. tlsPeerArgs := []string{
  251. "--peer-cert-file", certPath,
  252. "--peer-key-file", privateKeyPath,
  253. "--peer-ca-file", caPath,
  254. }
  255. args = append(args, tlsPeerArgs...)
  256. }
  257. }
  258. if cfg.isClientCRL {
  259. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  260. }
  261. return args
  262. }
  263. func (epc *etcdProcessCluster) EndpointsV2() []string {
  264. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
  265. }
  266. func (epc *etcdProcessCluster) EndpointsV3() []string {
  267. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
  268. }
  269. func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
  270. for _, p := range epc.procs {
  271. ret = append(ret, f(p)...)
  272. }
  273. return ret
  274. }
  275. func (epc *etcdProcessCluster) Start() error {
  276. return epc.start(func(ep etcdProcess) error { return ep.Start() })
  277. }
  278. func (epc *etcdProcessCluster) Restart() error {
  279. return epc.start(func(ep etcdProcess) error { return ep.Restart() })
  280. }
  281. func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
  282. readyC := make(chan error, len(epc.procs))
  283. for i := range epc.procs {
  284. go func(n int) { readyC <- f(epc.procs[n]) }(i)
  285. }
  286. for range epc.procs {
  287. if err := <-readyC; err != nil {
  288. epc.Close()
  289. return err
  290. }
  291. }
  292. return nil
  293. }
  294. func (epc *etcdProcessCluster) Stop() (err error) {
  295. for _, p := range epc.procs {
  296. if p == nil {
  297. continue
  298. }
  299. if curErr := p.Stop(); curErr != nil {
  300. if err != nil {
  301. err = fmt.Errorf("%v; %v", err, curErr)
  302. } else {
  303. err = curErr
  304. }
  305. }
  306. }
  307. return err
  308. }
  309. func (epc *etcdProcessCluster) Close() error {
  310. err := epc.Stop()
  311. for _, p := range epc.procs {
  312. // p is nil when newEtcdProcess fails in the middle
  313. // Close still gets called to clean up test data
  314. if p == nil {
  315. continue
  316. }
  317. if cerr := p.Close(); cerr != nil {
  318. err = cerr
  319. }
  320. }
  321. return err
  322. }
  323. func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
  324. for _, p := range epc.procs {
  325. ret = p.WithStopSignal(sig)
  326. }
  327. return ret
  328. }