cluster_test.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/etcdserver"
  22. )
  23. const etcdProcessBasePort = 20000
  24. type clientConnType int
  25. const (
  26. clientNonTLS clientConnType = iota
  27. clientTLS
  28. clientTLSAndNonTLS
  29. )
  30. var (
  31. configNoTLS = etcdProcessClusterConfig{
  32. clusterSize: 3,
  33. initialToken: "new",
  34. }
  35. configAutoTLS = etcdProcessClusterConfig{
  36. clusterSize: 3,
  37. isPeerTLS: true,
  38. isPeerAutoTLS: true,
  39. initialToken: "new",
  40. }
  41. configTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. clientTLS: clientTLS,
  44. isPeerTLS: true,
  45. initialToken: "new",
  46. }
  47. configClientTLS = etcdProcessClusterConfig{
  48. clusterSize: 3,
  49. clientTLS: clientTLS,
  50. initialToken: "new",
  51. }
  52. configClientBoth = etcdProcessClusterConfig{
  53. clusterSize: 1,
  54. clientTLS: clientTLSAndNonTLS,
  55. initialToken: "new",
  56. }
  57. configClientAutoTLS = etcdProcessClusterConfig{
  58. clusterSize: 1,
  59. isClientAutoTLS: true,
  60. clientTLS: clientTLS,
  61. initialToken: "new",
  62. }
  63. configPeerTLS = etcdProcessClusterConfig{
  64. clusterSize: 3,
  65. isPeerTLS: true,
  66. initialToken: "new",
  67. }
  68. configClientTLSCertAuth = etcdProcessClusterConfig{
  69. clusterSize: 1,
  70. clientTLS: clientTLS,
  71. initialToken: "new",
  72. clientCertAuthEnabled: true,
  73. }
  74. )
  75. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  76. ret := cfg
  77. ret.clusterSize = 1
  78. return &ret
  79. }
  80. type etcdProcessCluster struct {
  81. cfg *etcdProcessClusterConfig
  82. procs []etcdProcess
  83. }
  84. type etcdProcessClusterConfig struct {
  85. execPath string
  86. dataDirPath string
  87. keepDataDir bool
  88. clusterSize int
  89. baseScheme string
  90. basePort int
  91. metricsURLScheme string
  92. snapCount int // default is 10000
  93. clientTLS clientConnType
  94. clientCertAuthEnabled bool
  95. isPeerTLS bool
  96. isPeerAutoTLS bool
  97. isClientAutoTLS bool
  98. isClientCRL bool
  99. cipherSuites []string
  100. forceNewCluster bool
  101. initialToken string
  102. quotaBackendBytes int64
  103. noStrictReconfig bool
  104. initialCorruptCheck bool
  105. }
  106. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  107. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  108. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  109. etcdCfgs := cfg.etcdServerProcessConfigs()
  110. epc := &etcdProcessCluster{
  111. cfg: cfg,
  112. procs: make([]etcdProcess, cfg.clusterSize),
  113. }
  114. // launch etcd processes
  115. for i := range etcdCfgs {
  116. proc, err := newEtcdProcess(etcdCfgs[i])
  117. if err != nil {
  118. epc.Close()
  119. return nil, err
  120. }
  121. epc.procs[i] = proc
  122. }
  123. if err := epc.Start(); err != nil {
  124. return nil, err
  125. }
  126. return epc, nil
  127. }
  128. func (cfg *etcdProcessClusterConfig) clientScheme() string {
  129. if cfg.clientTLS == clientTLS {
  130. return "https"
  131. }
  132. return "http"
  133. }
  134. func (cfg *etcdProcessClusterConfig) peerScheme() string {
  135. peerScheme := cfg.baseScheme
  136. if peerScheme == "" {
  137. peerScheme = "http"
  138. }
  139. if cfg.isPeerTLS {
  140. peerScheme += "s"
  141. }
  142. return peerScheme
  143. }
  144. func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
  145. if cfg.basePort == 0 {
  146. cfg.basePort = etcdProcessBasePort
  147. }
  148. if cfg.execPath == "" {
  149. cfg.execPath = binPath
  150. }
  151. if cfg.snapCount == 0 {
  152. cfg.snapCount = etcdserver.DefaultSnapCount
  153. }
  154. etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
  155. initialCluster := make([]string, cfg.clusterSize)
  156. for i := 0; i < cfg.clusterSize; i++ {
  157. var curls []string
  158. var curl, curltls string
  159. port := cfg.basePort + 5*i
  160. curlHost := fmt.Sprintf("localhost:%d", port)
  161. switch cfg.clientTLS {
  162. case clientNonTLS, clientTLS:
  163. curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
  164. curls = []string{curl}
  165. case clientTLSAndNonTLS:
  166. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  167. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  168. curls = []string{curl, curltls}
  169. }
  170. purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
  171. name := fmt.Sprintf("testname%d", i)
  172. dataDirPath := cfg.dataDirPath
  173. if cfg.dataDirPath == "" {
  174. var derr error
  175. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  176. if derr != nil {
  177. panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
  178. }
  179. }
  180. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  181. args := []string{
  182. "--name", name,
  183. "--listen-client-urls", strings.Join(curls, ","),
  184. "--advertise-client-urls", strings.Join(curls, ","),
  185. "--listen-peer-urls", purl.String(),
  186. "--initial-advertise-peer-urls", purl.String(),
  187. "--initial-cluster-token", cfg.initialToken,
  188. "--data-dir", dataDirPath,
  189. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  190. }
  191. args = addV2Args(args)
  192. if cfg.forceNewCluster {
  193. args = append(args, "--force-new-cluster")
  194. }
  195. if cfg.quotaBackendBytes > 0 {
  196. args = append(args,
  197. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  198. )
  199. }
  200. if cfg.noStrictReconfig {
  201. args = append(args, "--strict-reconfig-check=false")
  202. }
  203. if cfg.initialCorruptCheck {
  204. args = append(args, "--experimental-initial-corrupt-check")
  205. }
  206. var murl string
  207. if cfg.metricsURLScheme != "" {
  208. murl = (&url.URL{
  209. Scheme: cfg.metricsURLScheme,
  210. Host: fmt.Sprintf("localhost:%d", port+2),
  211. }).String()
  212. args = append(args, "--listen-metrics-urls", murl)
  213. }
  214. args = append(args, cfg.tlsArgs()...)
  215. etcdCfgs[i] = &etcdServerProcessConfig{
  216. execPath: cfg.execPath,
  217. args: args,
  218. tlsArgs: cfg.tlsArgs(),
  219. dataDirPath: dataDirPath,
  220. keepDataDir: cfg.keepDataDir,
  221. name: name,
  222. purl: purl,
  223. acurl: curl,
  224. murl: murl,
  225. initialToken: cfg.initialToken,
  226. }
  227. }
  228. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  229. for i := range etcdCfgs {
  230. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  231. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  232. }
  233. return etcdCfgs
  234. }
  235. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  236. if cfg.clientTLS != clientNonTLS {
  237. if cfg.isClientAutoTLS {
  238. args = append(args, "--auto-tls")
  239. } else {
  240. tlsClientArgs := []string{
  241. "--cert-file", certPath,
  242. "--key-file", privateKeyPath,
  243. "--ca-file", caPath,
  244. }
  245. args = append(args, tlsClientArgs...)
  246. if cfg.clientCertAuthEnabled {
  247. args = append(args, "--client-cert-auth")
  248. }
  249. }
  250. }
  251. if cfg.isPeerTLS {
  252. if cfg.isPeerAutoTLS {
  253. args = append(args, "--peer-auto-tls")
  254. } else {
  255. tlsPeerArgs := []string{
  256. "--peer-cert-file", certPath,
  257. "--peer-key-file", privateKeyPath,
  258. "--peer-ca-file", caPath,
  259. }
  260. args = append(args, tlsPeerArgs...)
  261. }
  262. }
  263. if cfg.isClientCRL {
  264. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  265. }
  266. if len(cfg.cipherSuites) > 0 {
  267. args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ","))
  268. }
  269. return args
  270. }
  271. func (epc *etcdProcessCluster) EndpointsV2() []string {
  272. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
  273. }
  274. func (epc *etcdProcessCluster) EndpointsV3() []string {
  275. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
  276. }
  277. func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
  278. for _, p := range epc.procs {
  279. ret = append(ret, f(p)...)
  280. }
  281. return ret
  282. }
  283. func (epc *etcdProcessCluster) Start() error {
  284. return epc.start(func(ep etcdProcess) error { return ep.Start() })
  285. }
  286. func (epc *etcdProcessCluster) Restart() error {
  287. return epc.start(func(ep etcdProcess) error { return ep.Restart() })
  288. }
  289. func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
  290. readyC := make(chan error, len(epc.procs))
  291. for i := range epc.procs {
  292. go func(n int) { readyC <- f(epc.procs[n]) }(i)
  293. }
  294. for range epc.procs {
  295. if err := <-readyC; err != nil {
  296. epc.Close()
  297. return err
  298. }
  299. }
  300. return nil
  301. }
  302. func (epc *etcdProcessCluster) Stop() (err error) {
  303. for _, p := range epc.procs {
  304. if p == nil {
  305. continue
  306. }
  307. if curErr := p.Stop(); curErr != nil {
  308. if err != nil {
  309. err = fmt.Errorf("%v; %v", err, curErr)
  310. } else {
  311. err = curErr
  312. }
  313. }
  314. }
  315. return err
  316. }
  317. func (epc *etcdProcessCluster) Close() error {
  318. err := epc.Stop()
  319. for _, p := range epc.procs {
  320. // p is nil when newEtcdProcess fails in the middle
  321. // Close still gets called to clean up test data
  322. if p == nil {
  323. continue
  324. }
  325. if cerr := p.Close(); cerr != nil {
  326. err = cerr
  327. }
  328. }
  329. return err
  330. }
  331. func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
  332. for _, p := range epc.procs {
  333. ret = p.WithStopSignal(sig)
  334. }
  335. return ret
  336. }