cluster_test.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/etcdserver"
  22. )
  23. const etcdProcessBasePort = 20000
  24. type clientConnType int
  25. const (
  26. clientNonTLS clientConnType = iota
  27. clientTLS
  28. clientTLSAndNonTLS
  29. )
  30. var (
  31. configNoTLS = etcdProcessClusterConfig{
  32. clusterSize: 3,
  33. initialToken: "new",
  34. }
  35. configAutoTLS = etcdProcessClusterConfig{
  36. clusterSize: 3,
  37. isPeerTLS: true,
  38. isPeerAutoTLS: true,
  39. initialToken: "new",
  40. }
  41. configTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. clientTLS: clientTLS,
  44. isPeerTLS: true,
  45. initialToken: "new",
  46. }
  47. configClientTLS = etcdProcessClusterConfig{
  48. clusterSize: 3,
  49. clientTLS: clientTLS,
  50. initialToken: "new",
  51. }
  52. configClientBoth = etcdProcessClusterConfig{
  53. clusterSize: 1,
  54. clientTLS: clientTLSAndNonTLS,
  55. initialToken: "new",
  56. }
  57. configClientAutoTLS = etcdProcessClusterConfig{
  58. clusterSize: 1,
  59. isClientAutoTLS: true,
  60. clientTLS: clientTLS,
  61. initialToken: "new",
  62. }
  63. configPeerTLS = etcdProcessClusterConfig{
  64. clusterSize: 3,
  65. isPeerTLS: true,
  66. initialToken: "new",
  67. }
  68. configClientTLSCertAuth = etcdProcessClusterConfig{
  69. clusterSize: 1,
  70. clientTLS: clientTLS,
  71. initialToken: "new",
  72. clientCertAuthEnabled: true,
  73. }
  74. configJWT = etcdProcessClusterConfig{
  75. clusterSize: 1,
  76. initialToken: "new",
  77. authTokenOpts: "jwt,pub-key=../../integration/fixtures/server.crt,priv-key=../../integration/fixtures/server.key.insecure,sign-method=RS256,ttl=1s",
  78. }
  79. )
  80. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  81. ret := cfg
  82. ret.clusterSize = 1
  83. return &ret
  84. }
  85. type etcdProcessCluster struct {
  86. cfg *etcdProcessClusterConfig
  87. procs []etcdProcess
  88. }
  89. type etcdProcessClusterConfig struct {
  90. execPath string
  91. dataDirPath string
  92. keepDataDir bool
  93. clusterSize int
  94. baseScheme string
  95. basePort int
  96. metricsURLScheme string
  97. snapshotCount int // default is 10000
  98. clientTLS clientConnType
  99. clientCertAuthEnabled bool
  100. isPeerTLS bool
  101. isPeerAutoTLS bool
  102. isClientAutoTLS bool
  103. isClientCRL bool
  104. cipherSuites []string
  105. forceNewCluster bool
  106. initialToken string
  107. quotaBackendBytes int64
  108. noStrictReconfig bool
  109. initialCorruptCheck bool
  110. authTokenOpts string
  111. }
  112. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  113. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  114. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  115. etcdCfgs := cfg.etcdServerProcessConfigs()
  116. epc := &etcdProcessCluster{
  117. cfg: cfg,
  118. procs: make([]etcdProcess, cfg.clusterSize),
  119. }
  120. // launch etcd processes
  121. for i := range etcdCfgs {
  122. proc, err := newEtcdProcess(etcdCfgs[i])
  123. if err != nil {
  124. epc.Close()
  125. return nil, err
  126. }
  127. epc.procs[i] = proc
  128. }
  129. if err := epc.Start(); err != nil {
  130. return nil, err
  131. }
  132. return epc, nil
  133. }
  134. func (cfg *etcdProcessClusterConfig) clientScheme() string {
  135. if cfg.clientTLS == clientTLS {
  136. return "https"
  137. }
  138. return "http"
  139. }
  140. func (cfg *etcdProcessClusterConfig) peerScheme() string {
  141. peerScheme := cfg.baseScheme
  142. if peerScheme == "" {
  143. peerScheme = "http"
  144. }
  145. if cfg.isPeerTLS {
  146. peerScheme += "s"
  147. }
  148. return peerScheme
  149. }
  150. func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
  151. if cfg.basePort == 0 {
  152. cfg.basePort = etcdProcessBasePort
  153. }
  154. if cfg.execPath == "" {
  155. cfg.execPath = binPath
  156. }
  157. if cfg.snapshotCount == 0 {
  158. cfg.snapshotCount = etcdserver.DefaultSnapshotCount
  159. }
  160. etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
  161. initialCluster := make([]string, cfg.clusterSize)
  162. for i := 0; i < cfg.clusterSize; i++ {
  163. var curls []string
  164. var curl, curltls string
  165. port := cfg.basePort + 5*i
  166. curlHost := fmt.Sprintf("localhost:%d", port)
  167. switch cfg.clientTLS {
  168. case clientNonTLS, clientTLS:
  169. curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
  170. curls = []string{curl}
  171. case clientTLSAndNonTLS:
  172. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  173. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  174. curls = []string{curl, curltls}
  175. }
  176. purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
  177. name := fmt.Sprintf("testname%d", i)
  178. dataDirPath := cfg.dataDirPath
  179. if cfg.dataDirPath == "" {
  180. var derr error
  181. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  182. if derr != nil {
  183. panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
  184. }
  185. }
  186. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  187. args := []string{
  188. "--name", name,
  189. "--listen-client-urls", strings.Join(curls, ","),
  190. "--advertise-client-urls", strings.Join(curls, ","),
  191. "--listen-peer-urls", purl.String(),
  192. "--initial-advertise-peer-urls", purl.String(),
  193. "--initial-cluster-token", cfg.initialToken,
  194. "--data-dir", dataDirPath,
  195. "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
  196. }
  197. args = addV2Args(args)
  198. if cfg.forceNewCluster {
  199. args = append(args, "--force-new-cluster")
  200. }
  201. if cfg.quotaBackendBytes > 0 {
  202. args = append(args,
  203. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  204. )
  205. }
  206. if cfg.noStrictReconfig {
  207. args = append(args, "--strict-reconfig-check=false")
  208. }
  209. if cfg.initialCorruptCheck {
  210. args = append(args, "--experimental-initial-corrupt-check")
  211. }
  212. var murl string
  213. if cfg.metricsURLScheme != "" {
  214. murl = (&url.URL{
  215. Scheme: cfg.metricsURLScheme,
  216. Host: fmt.Sprintf("localhost:%d", port+2),
  217. }).String()
  218. args = append(args, "--listen-metrics-urls", murl)
  219. }
  220. args = append(args, cfg.tlsArgs()...)
  221. if cfg.authTokenOpts != "" {
  222. args = append(args, "--auth-token", cfg.authTokenOpts)
  223. }
  224. etcdCfgs[i] = &etcdServerProcessConfig{
  225. execPath: cfg.execPath,
  226. args: args,
  227. tlsArgs: cfg.tlsArgs(),
  228. dataDirPath: dataDirPath,
  229. keepDataDir: cfg.keepDataDir,
  230. name: name,
  231. purl: purl,
  232. acurl: curl,
  233. murl: murl,
  234. initialToken: cfg.initialToken,
  235. }
  236. }
  237. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  238. for i := range etcdCfgs {
  239. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  240. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  241. }
  242. return etcdCfgs
  243. }
  244. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  245. if cfg.clientTLS != clientNonTLS {
  246. if cfg.isClientAutoTLS {
  247. args = append(args, "--auto-tls")
  248. } else {
  249. tlsClientArgs := []string{
  250. "--cert-file", certPath,
  251. "--key-file", privateKeyPath,
  252. "--trusted-ca-file", caPath,
  253. }
  254. args = append(args, tlsClientArgs...)
  255. if cfg.clientCertAuthEnabled {
  256. args = append(args, "--client-cert-auth")
  257. }
  258. }
  259. }
  260. if cfg.isPeerTLS {
  261. if cfg.isPeerAutoTLS {
  262. args = append(args, "--peer-auto-tls")
  263. } else {
  264. tlsPeerArgs := []string{
  265. "--peer-cert-file", certPath,
  266. "--peer-key-file", privateKeyPath,
  267. "--peer-trusted-ca-file", caPath,
  268. }
  269. args = append(args, tlsPeerArgs...)
  270. }
  271. }
  272. if cfg.isClientCRL {
  273. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  274. }
  275. if len(cfg.cipherSuites) > 0 {
  276. args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ","))
  277. }
  278. return args
  279. }
  280. func (epc *etcdProcessCluster) EndpointsV2() []string {
  281. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
  282. }
  283. func (epc *etcdProcessCluster) EndpointsV3() []string {
  284. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
  285. }
  286. func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
  287. for _, p := range epc.procs {
  288. ret = append(ret, f(p)...)
  289. }
  290. return ret
  291. }
  292. func (epc *etcdProcessCluster) Start() error {
  293. return epc.start(func(ep etcdProcess) error { return ep.Start() })
  294. }
  295. func (epc *etcdProcessCluster) Restart() error {
  296. return epc.start(func(ep etcdProcess) error { return ep.Restart() })
  297. }
  298. func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
  299. readyC := make(chan error, len(epc.procs))
  300. for i := range epc.procs {
  301. go func(n int) { readyC <- f(epc.procs[n]) }(i)
  302. }
  303. for range epc.procs {
  304. if err := <-readyC; err != nil {
  305. epc.Close()
  306. return err
  307. }
  308. }
  309. return nil
  310. }
  311. func (epc *etcdProcessCluster) Stop() (err error) {
  312. for _, p := range epc.procs {
  313. if p == nil {
  314. continue
  315. }
  316. if curErr := p.Stop(); curErr != nil {
  317. if err != nil {
  318. err = fmt.Errorf("%v; %v", err, curErr)
  319. } else {
  320. err = curErr
  321. }
  322. }
  323. }
  324. return err
  325. }
  326. func (epc *etcdProcessCluster) Close() error {
  327. err := epc.Stop()
  328. for _, p := range epc.procs {
  329. // p is nil when newEtcdProcess fails in the middle
  330. // Close still gets called to clean up test data
  331. if p == nil {
  332. continue
  333. }
  334. if cerr := p.Close(); cerr != nil {
  335. err = cerr
  336. }
  337. }
  338. return err
  339. }
  340. func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
  341. for _, p := range epc.procs {
  342. ret = p.WithStopSignal(sig)
  343. }
  344. return ret
  345. }