cluster_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "go.etcd.io/etcd/etcdserver"
  22. )
  23. const etcdProcessBasePort = 20000
  24. type clientConnType int
  25. const (
  26. clientNonTLS clientConnType = iota
  27. clientTLS
  28. clientTLSAndNonTLS
  29. )
  30. var (
  31. configNoTLS = etcdProcessClusterConfig{
  32. clusterSize: 3,
  33. initialToken: "new",
  34. }
  35. configAutoTLS = etcdProcessClusterConfig{
  36. clusterSize: 3,
  37. isPeerTLS: true,
  38. isPeerAutoTLS: true,
  39. initialToken: "new",
  40. }
  41. configTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. clientTLS: clientTLS,
  44. isPeerTLS: true,
  45. initialToken: "new",
  46. }
  47. configClientTLS = etcdProcessClusterConfig{
  48. clusterSize: 3,
  49. clientTLS: clientTLS,
  50. initialToken: "new",
  51. }
  52. configClientBoth = etcdProcessClusterConfig{
  53. clusterSize: 1,
  54. clientTLS: clientTLSAndNonTLS,
  55. initialToken: "new",
  56. }
  57. configClientAutoTLS = etcdProcessClusterConfig{
  58. clusterSize: 1,
  59. isClientAutoTLS: true,
  60. clientTLS: clientTLS,
  61. initialToken: "new",
  62. }
  63. configPeerTLS = etcdProcessClusterConfig{
  64. clusterSize: 3,
  65. isPeerTLS: true,
  66. initialToken: "new",
  67. }
  68. configClientTLSCertAuth = etcdProcessClusterConfig{
  69. clusterSize: 1,
  70. clientTLS: clientTLS,
  71. initialToken: "new",
  72. clientCertAuthEnabled: true,
  73. }
  74. configClientTLSCertAuthWithNoCN = etcdProcessClusterConfig{
  75. clusterSize: 1,
  76. clientTLS: clientTLS,
  77. initialToken: "new",
  78. clientCertAuthEnabled: true,
  79. noCN: true,
  80. }
  81. configJWT = etcdProcessClusterConfig{
  82. clusterSize: 1,
  83. initialToken: "new",
  84. authTokenOpts: "jwt,pub-key=../../integration/fixtures/server.crt,priv-key=../../integration/fixtures/server.key.insecure,sign-method=RS256,ttl=1s",
  85. }
  86. )
  87. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  88. ret := cfg
  89. ret.clusterSize = 1
  90. return &ret
  91. }
  92. type etcdProcessCluster struct {
  93. cfg *etcdProcessClusterConfig
  94. procs []etcdProcess
  95. }
  96. type etcdProcessClusterConfig struct {
  97. execPath string
  98. dataDirPath string
  99. keepDataDir bool
  100. clusterSize int
  101. baseScheme string
  102. basePort int
  103. metricsURLScheme string
  104. snapshotCount int // default is 10000
  105. clientTLS clientConnType
  106. clientCertAuthEnabled bool
  107. isPeerTLS bool
  108. isPeerAutoTLS bool
  109. isClientAutoTLS bool
  110. isClientCRL bool
  111. noCN bool
  112. cipherSuites []string
  113. forceNewCluster bool
  114. initialToken string
  115. quotaBackendBytes int64
  116. noStrictReconfig bool
  117. initialCorruptCheck bool
  118. authTokenOpts string
  119. }
  120. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  121. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  122. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  123. etcdCfgs := cfg.etcdServerProcessConfigs()
  124. epc := &etcdProcessCluster{
  125. cfg: cfg,
  126. procs: make([]etcdProcess, cfg.clusterSize),
  127. }
  128. // launch etcd processes
  129. for i := range etcdCfgs {
  130. proc, err := newEtcdProcess(etcdCfgs[i])
  131. if err != nil {
  132. epc.Close()
  133. return nil, err
  134. }
  135. epc.procs[i] = proc
  136. }
  137. if err := epc.Start(); err != nil {
  138. return nil, err
  139. }
  140. return epc, nil
  141. }
  142. func (cfg *etcdProcessClusterConfig) clientScheme() string {
  143. if cfg.clientTLS == clientTLS {
  144. return "https"
  145. }
  146. return "http"
  147. }
  148. func (cfg *etcdProcessClusterConfig) peerScheme() string {
  149. peerScheme := cfg.baseScheme
  150. if peerScheme == "" {
  151. peerScheme = "http"
  152. }
  153. if cfg.isPeerTLS {
  154. peerScheme += "s"
  155. }
  156. return peerScheme
  157. }
  158. func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
  159. if cfg.basePort == 0 {
  160. cfg.basePort = etcdProcessBasePort
  161. }
  162. if cfg.execPath == "" {
  163. cfg.execPath = binPath
  164. }
  165. if cfg.snapshotCount == 0 {
  166. cfg.snapshotCount = etcdserver.DefaultSnapshotCount
  167. }
  168. etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
  169. initialCluster := make([]string, cfg.clusterSize)
  170. for i := 0; i < cfg.clusterSize; i++ {
  171. var curls []string
  172. var curl, curltls string
  173. port := cfg.basePort + 5*i
  174. curlHost := fmt.Sprintf("localhost:%d", port)
  175. switch cfg.clientTLS {
  176. case clientNonTLS, clientTLS:
  177. curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
  178. curls = []string{curl}
  179. case clientTLSAndNonTLS:
  180. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  181. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  182. curls = []string{curl, curltls}
  183. }
  184. purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
  185. name := fmt.Sprintf("testname%d", i)
  186. dataDirPath := cfg.dataDirPath
  187. if cfg.dataDirPath == "" {
  188. var derr error
  189. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  190. if derr != nil {
  191. panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
  192. }
  193. }
  194. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  195. args := []string{
  196. "--name", name,
  197. "--listen-client-urls", strings.Join(curls, ","),
  198. "--advertise-client-urls", strings.Join(curls, ","),
  199. "--listen-peer-urls", purl.String(),
  200. "--initial-advertise-peer-urls", purl.String(),
  201. "--initial-cluster-token", cfg.initialToken,
  202. "--data-dir", dataDirPath,
  203. "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
  204. }
  205. args = addV2Args(args)
  206. if cfg.forceNewCluster {
  207. args = append(args, "--force-new-cluster")
  208. }
  209. if cfg.quotaBackendBytes > 0 {
  210. args = append(args,
  211. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  212. )
  213. }
  214. if cfg.noStrictReconfig {
  215. args = append(args, "--strict-reconfig-check=false")
  216. }
  217. if cfg.initialCorruptCheck {
  218. args = append(args, "--experimental-initial-corrupt-check")
  219. }
  220. var murl string
  221. if cfg.metricsURLScheme != "" {
  222. murl = (&url.URL{
  223. Scheme: cfg.metricsURLScheme,
  224. Host: fmt.Sprintf("localhost:%d", port+2),
  225. }).String()
  226. args = append(args, "--listen-metrics-urls", murl)
  227. }
  228. args = append(args, cfg.tlsArgs()...)
  229. if cfg.authTokenOpts != "" {
  230. args = append(args, "--auth-token", cfg.authTokenOpts)
  231. }
  232. etcdCfgs[i] = &etcdServerProcessConfig{
  233. execPath: cfg.execPath,
  234. args: args,
  235. tlsArgs: cfg.tlsArgs(),
  236. dataDirPath: dataDirPath,
  237. keepDataDir: cfg.keepDataDir,
  238. name: name,
  239. purl: purl,
  240. acurl: curl,
  241. murl: murl,
  242. initialToken: cfg.initialToken,
  243. }
  244. }
  245. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  246. for i := range etcdCfgs {
  247. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  248. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  249. }
  250. return etcdCfgs
  251. }
  252. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  253. if cfg.clientTLS != clientNonTLS {
  254. if cfg.isClientAutoTLS {
  255. args = append(args, "--auto-tls")
  256. } else {
  257. tlsClientArgs := []string{
  258. "--cert-file", certPath,
  259. "--key-file", privateKeyPath,
  260. "--trusted-ca-file", caPath,
  261. }
  262. args = append(args, tlsClientArgs...)
  263. if cfg.clientCertAuthEnabled {
  264. args = append(args, "--client-cert-auth")
  265. }
  266. }
  267. }
  268. if cfg.isPeerTLS {
  269. if cfg.isPeerAutoTLS {
  270. args = append(args, "--peer-auto-tls")
  271. } else {
  272. tlsPeerArgs := []string{
  273. "--peer-cert-file", certPath,
  274. "--peer-key-file", privateKeyPath,
  275. "--peer-trusted-ca-file", caPath,
  276. }
  277. args = append(args, tlsPeerArgs...)
  278. }
  279. }
  280. if cfg.isClientCRL {
  281. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  282. }
  283. if len(cfg.cipherSuites) > 0 {
  284. args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ","))
  285. }
  286. return args
  287. }
  288. func (epc *etcdProcessCluster) EndpointsV2() []string {
  289. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
  290. }
  291. func (epc *etcdProcessCluster) EndpointsV3() []string {
  292. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
  293. }
  294. func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
  295. for _, p := range epc.procs {
  296. ret = append(ret, f(p)...)
  297. }
  298. return ret
  299. }
  300. func (epc *etcdProcessCluster) Start() error {
  301. return epc.start(func(ep etcdProcess) error { return ep.Start() })
  302. }
  303. func (epc *etcdProcessCluster) Restart() error {
  304. return epc.start(func(ep etcdProcess) error { return ep.Restart() })
  305. }
  306. func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
  307. readyC := make(chan error, len(epc.procs))
  308. for i := range epc.procs {
  309. go func(n int) { readyC <- f(epc.procs[n]) }(i)
  310. }
  311. for range epc.procs {
  312. if err := <-readyC; err != nil {
  313. epc.Close()
  314. return err
  315. }
  316. }
  317. return nil
  318. }
  319. func (epc *etcdProcessCluster) Stop() (err error) {
  320. for _, p := range epc.procs {
  321. if p == nil {
  322. continue
  323. }
  324. if curErr := p.Stop(); curErr != nil {
  325. if err != nil {
  326. err = fmt.Errorf("%v; %v", err, curErr)
  327. } else {
  328. err = curErr
  329. }
  330. }
  331. }
  332. return err
  333. }
  334. func (epc *etcdProcessCluster) Close() error {
  335. err := epc.Stop()
  336. for _, p := range epc.procs {
  337. // p is nil when newEtcdProcess fails in the middle
  338. // Close still gets called to clean up test data
  339. if p == nil {
  340. continue
  341. }
  342. if cerr := p.Close(); cerr != nil {
  343. err = cerr
  344. }
  345. }
  346. return err
  347. }
  348. func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
  349. for _, p := range epc.procs {
  350. ret = p.WithStopSignal(sig)
  351. }
  352. return ret
  353. }