cluster_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "go.etcd.io/etcd/etcdserver"
  22. )
  23. const etcdProcessBasePort = 20000
  24. type clientConnType int
  25. const (
  26. clientNonTLS clientConnType = iota
  27. clientTLS
  28. clientTLSAndNonTLS
  29. )
  30. var (
  31. configNoTLS = etcdProcessClusterConfig{
  32. clusterSize: 3,
  33. initialToken: "new",
  34. }
  35. configAutoTLS = etcdProcessClusterConfig{
  36. clusterSize: 3,
  37. isPeerTLS: true,
  38. isPeerAutoTLS: true,
  39. initialToken: "new",
  40. }
  41. configTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. clientTLS: clientTLS,
  44. isPeerTLS: true,
  45. initialToken: "new",
  46. }
  47. configClientTLS = etcdProcessClusterConfig{
  48. clusterSize: 3,
  49. clientTLS: clientTLS,
  50. initialToken: "new",
  51. }
  52. configClientBoth = etcdProcessClusterConfig{
  53. clusterSize: 1,
  54. clientTLS: clientTLSAndNonTLS,
  55. initialToken: "new",
  56. }
  57. configClientAutoTLS = etcdProcessClusterConfig{
  58. clusterSize: 1,
  59. isClientAutoTLS: true,
  60. clientTLS: clientTLS,
  61. initialToken: "new",
  62. }
  63. configPeerTLS = etcdProcessClusterConfig{
  64. clusterSize: 3,
  65. isPeerTLS: true,
  66. initialToken: "new",
  67. }
  68. configClientTLSCertAuth = etcdProcessClusterConfig{
  69. clusterSize: 1,
  70. clientTLS: clientTLS,
  71. initialToken: "new",
  72. clientCertAuthEnabled: true,
  73. }
  74. configClientTLSCertAuthWithNoCN = etcdProcessClusterConfig{
  75. clusterSize: 1,
  76. clientTLS: clientTLS,
  77. initialToken: "new",
  78. clientCertAuthEnabled: true,
  79. noCN: true,
  80. }
  81. configJWT = etcdProcessClusterConfig{
  82. clusterSize: 1,
  83. initialToken: "new",
  84. authTokenOpts: "jwt,pub-key=../../integration/fixtures/server.crt,priv-key=../../integration/fixtures/server.key.insecure,sign-method=RS256,ttl=1s",
  85. }
  86. )
  87. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  88. ret := cfg
  89. ret.clusterSize = 1
  90. return &ret
  91. }
  92. type etcdProcessCluster struct {
  93. cfg *etcdProcessClusterConfig
  94. procs []etcdProcess
  95. }
  96. type etcdProcessClusterConfig struct {
  97. execPath string
  98. dataDirPath string
  99. keepDataDir bool
  100. clusterSize int
  101. baseScheme string
  102. basePort int
  103. metricsURLScheme string
  104. snapshotCount int // default is 10000
  105. clientTLS clientConnType
  106. clientCertAuthEnabled bool
  107. isPeerTLS bool
  108. isPeerAutoTLS bool
  109. isClientAutoTLS bool
  110. isClientCRL bool
  111. noCN bool
  112. cipherSuites []string
  113. forceNewCluster bool
  114. initialToken string
  115. quotaBackendBytes int64
  116. noStrictReconfig bool
  117. enableV2 bool
  118. initialCorruptCheck bool
  119. authTokenOpts string
  120. }
  121. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  122. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  123. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  124. etcdCfgs := cfg.etcdServerProcessConfigs()
  125. epc := &etcdProcessCluster{
  126. cfg: cfg,
  127. procs: make([]etcdProcess, cfg.clusterSize),
  128. }
  129. // launch etcd processes
  130. for i := range etcdCfgs {
  131. proc, err := newEtcdProcess(etcdCfgs[i])
  132. if err != nil {
  133. epc.Close()
  134. return nil, err
  135. }
  136. epc.procs[i] = proc
  137. }
  138. if err := epc.Start(); err != nil {
  139. return nil, err
  140. }
  141. return epc, nil
  142. }
  143. func (cfg *etcdProcessClusterConfig) clientScheme() string {
  144. if cfg.clientTLS == clientTLS {
  145. return "https"
  146. }
  147. return "http"
  148. }
  149. func (cfg *etcdProcessClusterConfig) peerScheme() string {
  150. peerScheme := cfg.baseScheme
  151. if peerScheme == "" {
  152. peerScheme = "http"
  153. }
  154. if cfg.isPeerTLS {
  155. peerScheme += "s"
  156. }
  157. return peerScheme
  158. }
  159. func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
  160. if cfg.basePort == 0 {
  161. cfg.basePort = etcdProcessBasePort
  162. }
  163. if cfg.execPath == "" {
  164. cfg.execPath = binPath
  165. }
  166. if cfg.snapshotCount == 0 {
  167. cfg.snapshotCount = etcdserver.DefaultSnapshotCount
  168. }
  169. etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
  170. initialCluster := make([]string, cfg.clusterSize)
  171. for i := 0; i < cfg.clusterSize; i++ {
  172. var curls []string
  173. var curl, curltls string
  174. port := cfg.basePort + 5*i
  175. curlHost := fmt.Sprintf("localhost:%d", port)
  176. switch cfg.clientTLS {
  177. case clientNonTLS, clientTLS:
  178. curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
  179. curls = []string{curl}
  180. case clientTLSAndNonTLS:
  181. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  182. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  183. curls = []string{curl, curltls}
  184. }
  185. purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
  186. name := fmt.Sprintf("testname%d", i)
  187. dataDirPath := cfg.dataDirPath
  188. if cfg.dataDirPath == "" {
  189. var derr error
  190. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  191. if derr != nil {
  192. panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
  193. }
  194. }
  195. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  196. args := []string{
  197. "--name", name,
  198. "--listen-client-urls", strings.Join(curls, ","),
  199. "--advertise-client-urls", strings.Join(curls, ","),
  200. "--listen-peer-urls", purl.String(),
  201. "--initial-advertise-peer-urls", purl.String(),
  202. "--initial-cluster-token", cfg.initialToken,
  203. "--data-dir", dataDirPath,
  204. "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
  205. }
  206. args = addV2Args(args)
  207. if cfg.forceNewCluster {
  208. args = append(args, "--force-new-cluster")
  209. }
  210. if cfg.quotaBackendBytes > 0 {
  211. args = append(args,
  212. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  213. )
  214. }
  215. if cfg.noStrictReconfig {
  216. args = append(args, "--strict-reconfig-check=false")
  217. }
  218. if cfg.enableV2 {
  219. args = append(args, "--enable-v2")
  220. }
  221. if cfg.initialCorruptCheck {
  222. args = append(args, "--experimental-initial-corrupt-check")
  223. }
  224. var murl string
  225. if cfg.metricsURLScheme != "" {
  226. murl = (&url.URL{
  227. Scheme: cfg.metricsURLScheme,
  228. Host: fmt.Sprintf("localhost:%d", port+2),
  229. }).String()
  230. args = append(args, "--listen-metrics-urls", murl)
  231. }
  232. args = append(args, cfg.tlsArgs()...)
  233. if cfg.authTokenOpts != "" {
  234. args = append(args, "--auth-token", cfg.authTokenOpts)
  235. }
  236. etcdCfgs[i] = &etcdServerProcessConfig{
  237. execPath: cfg.execPath,
  238. args: args,
  239. tlsArgs: cfg.tlsArgs(),
  240. dataDirPath: dataDirPath,
  241. keepDataDir: cfg.keepDataDir,
  242. name: name,
  243. purl: purl,
  244. acurl: curl,
  245. murl: murl,
  246. initialToken: cfg.initialToken,
  247. }
  248. }
  249. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  250. for i := range etcdCfgs {
  251. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  252. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  253. }
  254. return etcdCfgs
  255. }
  256. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  257. if cfg.clientTLS != clientNonTLS {
  258. if cfg.isClientAutoTLS {
  259. args = append(args, "--auto-tls")
  260. } else {
  261. tlsClientArgs := []string{
  262. "--cert-file", certPath,
  263. "--key-file", privateKeyPath,
  264. "--trusted-ca-file", caPath,
  265. }
  266. args = append(args, tlsClientArgs...)
  267. if cfg.clientCertAuthEnabled {
  268. args = append(args, "--client-cert-auth")
  269. }
  270. }
  271. }
  272. if cfg.isPeerTLS {
  273. if cfg.isPeerAutoTLS {
  274. args = append(args, "--peer-auto-tls")
  275. } else {
  276. tlsPeerArgs := []string{
  277. "--peer-cert-file", certPath,
  278. "--peer-key-file", privateKeyPath,
  279. "--peer-trusted-ca-file", caPath,
  280. }
  281. args = append(args, tlsPeerArgs...)
  282. }
  283. }
  284. if cfg.isClientCRL {
  285. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  286. }
  287. if len(cfg.cipherSuites) > 0 {
  288. args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ","))
  289. }
  290. return args
  291. }
  292. func (epc *etcdProcessCluster) EndpointsV2() []string {
  293. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
  294. }
  295. func (epc *etcdProcessCluster) EndpointsV3() []string {
  296. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
  297. }
  298. func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
  299. for _, p := range epc.procs {
  300. ret = append(ret, f(p)...)
  301. }
  302. return ret
  303. }
  304. func (epc *etcdProcessCluster) Start() error {
  305. return epc.start(func(ep etcdProcess) error { return ep.Start() })
  306. }
  307. func (epc *etcdProcessCluster) Restart() error {
  308. return epc.start(func(ep etcdProcess) error { return ep.Restart() })
  309. }
  310. func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
  311. readyC := make(chan error, len(epc.procs))
  312. for i := range epc.procs {
  313. go func(n int) { readyC <- f(epc.procs[n]) }(i)
  314. }
  315. for range epc.procs {
  316. if err := <-readyC; err != nil {
  317. epc.Close()
  318. return err
  319. }
  320. }
  321. return nil
  322. }
  323. func (epc *etcdProcessCluster) Stop() (err error) {
  324. for _, p := range epc.procs {
  325. if p == nil {
  326. continue
  327. }
  328. if curErr := p.Stop(); curErr != nil {
  329. if err != nil {
  330. err = fmt.Errorf("%v; %v", err, curErr)
  331. } else {
  332. err = curErr
  333. }
  334. }
  335. }
  336. return err
  337. }
  338. func (epc *etcdProcessCluster) Close() error {
  339. err := epc.Stop()
  340. for _, p := range epc.procs {
  341. // p is nil when newEtcdProcess fails in the middle
  342. // Close still gets called to clean up test data
  343. if p == nil {
  344. continue
  345. }
  346. if cerr := p.Close(); cerr != nil {
  347. err = cerr
  348. }
  349. }
  350. return err
  351. }
  352. func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
  353. for _, p := range epc.procs {
  354. ret = p.WithStopSignal(sig)
  355. }
  356. return ret
  357. }