cluster_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. )
  22. const etcdProcessBasePort = 20000
  23. type clientConnType int
  24. const (
  25. clientNonTLS clientConnType = iota
  26. clientTLS
  27. clientTLSAndNonTLS
  28. )
  29. var (
  30. configNoTLS = etcdProcessClusterConfig{
  31. clusterSize: 3,
  32. initialToken: "new",
  33. }
  34. configAutoTLS = etcdProcessClusterConfig{
  35. clusterSize: 3,
  36. isPeerTLS: true,
  37. isPeerAutoTLS: true,
  38. initialToken: "new",
  39. }
  40. configTLS = etcdProcessClusterConfig{
  41. clusterSize: 3,
  42. clientTLS: clientTLS,
  43. isPeerTLS: true,
  44. initialToken: "new",
  45. }
  46. configClientTLS = etcdProcessClusterConfig{
  47. clusterSize: 3,
  48. clientTLS: clientTLS,
  49. initialToken: "new",
  50. }
  51. configClientBoth = etcdProcessClusterConfig{
  52. clusterSize: 1,
  53. clientTLS: clientTLSAndNonTLS,
  54. initialToken: "new",
  55. }
  56. configClientAutoTLS = etcdProcessClusterConfig{
  57. clusterSize: 1,
  58. isClientAutoTLS: true,
  59. clientTLS: clientTLS,
  60. initialToken: "new",
  61. }
  62. configPeerTLS = etcdProcessClusterConfig{
  63. clusterSize: 3,
  64. isPeerTLS: true,
  65. initialToken: "new",
  66. }
  67. configClientTLSCertAuth = etcdProcessClusterConfig{
  68. clusterSize: 1,
  69. clientTLS: clientTLS,
  70. initialToken: "new",
  71. clientCertAuthEnabled: true,
  72. }
  73. configClientTLSCertAuthWithNoCN = etcdProcessClusterConfig{
  74. clusterSize: 1,
  75. clientTLS: clientTLS,
  76. initialToken: "new",
  77. clientCertAuthEnabled: true,
  78. noCN: true,
  79. }
  80. configJWT = etcdProcessClusterConfig{
  81. clusterSize: 1,
  82. initialToken: "new",
  83. authTokenOpts: "jwt,pub-key=../../integration/fixtures/server.crt,priv-key=../../integration/fixtures/server.key.insecure,sign-method=RS256,ttl=1s",
  84. }
  85. )
  86. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  87. ret := cfg
  88. ret.clusterSize = 1
  89. return &ret
  90. }
  91. type etcdProcessCluster struct {
  92. cfg *etcdProcessClusterConfig
  93. procs []etcdProcess
  94. }
  95. type etcdProcessClusterConfig struct {
  96. execPath string
  97. dataDirPath string
  98. keepDataDir bool
  99. clusterSize int
  100. baseScheme string
  101. basePort int
  102. metricsURLScheme string
  103. snapshotCount int // default is 10000
  104. clientTLS clientConnType
  105. clientCertAuthEnabled bool
  106. isPeerTLS bool
  107. isPeerAutoTLS bool
  108. isClientAutoTLS bool
  109. isClientCRL bool
  110. noCN bool
  111. cipherSuites []string
  112. forceNewCluster bool
  113. initialToken string
  114. quotaBackendBytes int64
  115. noStrictReconfig bool
  116. enableV2 bool
  117. initialCorruptCheck bool
  118. authTokenOpts string
  119. }
  120. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  121. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  122. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  123. etcdCfgs := cfg.etcdServerProcessConfigs()
  124. epc := &etcdProcessCluster{
  125. cfg: cfg,
  126. procs: make([]etcdProcess, cfg.clusterSize),
  127. }
  128. // launch etcd processes
  129. for i := range etcdCfgs {
  130. proc, err := newEtcdProcess(etcdCfgs[i])
  131. if err != nil {
  132. epc.Close()
  133. return nil, err
  134. }
  135. epc.procs[i] = proc
  136. }
  137. if err := epc.Start(); err != nil {
  138. return nil, err
  139. }
  140. return epc, nil
  141. }
  142. func (cfg *etcdProcessClusterConfig) clientScheme() string {
  143. if cfg.clientTLS == clientTLS {
  144. return "https"
  145. }
  146. return "http"
  147. }
  148. func (cfg *etcdProcessClusterConfig) peerScheme() string {
  149. peerScheme := cfg.baseScheme
  150. if peerScheme == "" {
  151. peerScheme = "http"
  152. }
  153. if cfg.isPeerTLS {
  154. peerScheme += "s"
  155. }
  156. return peerScheme
  157. }
  158. func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
  159. if cfg.basePort == 0 {
  160. cfg.basePort = etcdProcessBasePort
  161. }
  162. if cfg.execPath == "" {
  163. cfg.execPath = binPath
  164. }
  165. if cfg.snapshotCount == 0 {
  166. cfg.snapshotCount = 10000
  167. }
  168. etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
  169. initialCluster := make([]string, cfg.clusterSize)
  170. for i := 0; i < cfg.clusterSize; i++ {
  171. var curls []string
  172. var curl, curltls string
  173. port := cfg.basePort + 5*i
  174. curlHost := fmt.Sprintf("localhost:%d", port)
  175. switch cfg.clientTLS {
  176. case clientNonTLS, clientTLS:
  177. curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
  178. curls = []string{curl}
  179. case clientTLSAndNonTLS:
  180. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  181. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  182. curls = []string{curl, curltls}
  183. }
  184. purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
  185. name := fmt.Sprintf("testname%d", i)
  186. dataDirPath := cfg.dataDirPath
  187. if cfg.dataDirPath == "" {
  188. var derr error
  189. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  190. if derr != nil {
  191. panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
  192. }
  193. }
  194. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  195. args := []string{
  196. "--name", name,
  197. "--listen-client-urls", strings.Join(curls, ","),
  198. "--advertise-client-urls", strings.Join(curls, ","),
  199. "--listen-peer-urls", purl.String(),
  200. "--initial-advertise-peer-urls", purl.String(),
  201. "--initial-cluster-token", cfg.initialToken,
  202. "--data-dir", dataDirPath,
  203. "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
  204. }
  205. args = addV2Args(args)
  206. if cfg.forceNewCluster {
  207. args = append(args, "--force-new-cluster")
  208. }
  209. if cfg.quotaBackendBytes > 0 {
  210. args = append(args,
  211. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  212. )
  213. }
  214. if cfg.noStrictReconfig {
  215. args = append(args, "--strict-reconfig-check=false")
  216. }
  217. if cfg.enableV2 {
  218. args = append(args, "--enable-v2")
  219. }
  220. if cfg.initialCorruptCheck {
  221. args = append(args, "--experimental-initial-corrupt-check")
  222. }
  223. var murl string
  224. if cfg.metricsURLScheme != "" {
  225. murl = (&url.URL{
  226. Scheme: cfg.metricsURLScheme,
  227. Host: fmt.Sprintf("localhost:%d", port+2),
  228. }).String()
  229. args = append(args, "--listen-metrics-urls", murl)
  230. }
  231. args = append(args, cfg.tlsArgs()...)
  232. if cfg.authTokenOpts != "" {
  233. args = append(args, "--auth-token", cfg.authTokenOpts)
  234. }
  235. etcdCfgs[i] = &etcdServerProcessConfig{
  236. execPath: cfg.execPath,
  237. args: args,
  238. tlsArgs: cfg.tlsArgs(),
  239. dataDirPath: dataDirPath,
  240. keepDataDir: cfg.keepDataDir,
  241. name: name,
  242. purl: purl,
  243. acurl: curl,
  244. murl: murl,
  245. initialToken: cfg.initialToken,
  246. }
  247. }
  248. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  249. for i := range etcdCfgs {
  250. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  251. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  252. }
  253. return etcdCfgs
  254. }
  255. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  256. if cfg.clientTLS != clientNonTLS {
  257. if cfg.isClientAutoTLS {
  258. args = append(args, "--auto-tls")
  259. } else {
  260. tlsClientArgs := []string{
  261. "--cert-file", certPath,
  262. "--key-file", privateKeyPath,
  263. "--trusted-ca-file", caPath,
  264. }
  265. args = append(args, tlsClientArgs...)
  266. if cfg.clientCertAuthEnabled {
  267. args = append(args, "--client-cert-auth")
  268. }
  269. }
  270. }
  271. if cfg.isPeerTLS {
  272. if cfg.isPeerAutoTLS {
  273. args = append(args, "--peer-auto-tls")
  274. } else {
  275. tlsPeerArgs := []string{
  276. "--peer-cert-file", certPath,
  277. "--peer-key-file", privateKeyPath,
  278. "--peer-trusted-ca-file", caPath,
  279. }
  280. args = append(args, tlsPeerArgs...)
  281. }
  282. }
  283. if cfg.isClientCRL {
  284. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  285. }
  286. if len(cfg.cipherSuites) > 0 {
  287. args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ","))
  288. }
  289. return args
  290. }
  291. func (epc *etcdProcessCluster) EndpointsV2() []string {
  292. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
  293. }
  294. func (epc *etcdProcessCluster) EndpointsV3() []string {
  295. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
  296. }
  297. func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
  298. for _, p := range epc.procs {
  299. ret = append(ret, f(p)...)
  300. }
  301. return ret
  302. }
  303. func (epc *etcdProcessCluster) Start() error {
  304. return epc.start(func(ep etcdProcess) error { return ep.Start() })
  305. }
  306. func (epc *etcdProcessCluster) Restart() error {
  307. return epc.start(func(ep etcdProcess) error { return ep.Restart() })
  308. }
  309. func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
  310. readyC := make(chan error, len(epc.procs))
  311. for i := range epc.procs {
  312. go func(n int) { readyC <- f(epc.procs[n]) }(i)
  313. }
  314. for range epc.procs {
  315. if err := <-readyC; err != nil {
  316. epc.Close()
  317. return err
  318. }
  319. }
  320. return nil
  321. }
  322. func (epc *etcdProcessCluster) Stop() (err error) {
  323. for _, p := range epc.procs {
  324. if p == nil {
  325. continue
  326. }
  327. if curErr := p.Stop(); curErr != nil {
  328. if err != nil {
  329. err = fmt.Errorf("%v; %v", err, curErr)
  330. } else {
  331. err = curErr
  332. }
  333. }
  334. }
  335. return err
  336. }
  337. func (epc *etcdProcessCluster) Close() error {
  338. err := epc.Stop()
  339. for _, p := range epc.procs {
  340. // p is nil when newEtcdProcess fails in the middle
  341. // Close still gets called to clean up test data
  342. if p == nil {
  343. continue
  344. }
  345. if cerr := p.Close(); cerr != nil {
  346. err = cerr
  347. }
  348. }
  349. return err
  350. }
  351. func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
  352. for _, p := range epc.procs {
  353. ret = p.WithStopSignal(sig)
  354. }
  355. return ret
  356. }