cluster_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/etcdserver"
  22. )
  23. const etcdProcessBasePort = 20000
  24. type clientConnType int
  25. const (
  26. clientNonTLS clientConnType = iota
  27. clientTLS
  28. clientTLSAndNonTLS
  29. )
  30. var (
  31. configNoTLS = etcdProcessClusterConfig{
  32. clusterSize: 3,
  33. initialToken: "new",
  34. }
  35. configAutoTLS = etcdProcessClusterConfig{
  36. clusterSize: 3,
  37. isPeerTLS: true,
  38. isPeerAutoTLS: true,
  39. initialToken: "new",
  40. }
  41. configTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. clientTLS: clientTLS,
  44. isPeerTLS: true,
  45. initialToken: "new",
  46. }
  47. configClientTLS = etcdProcessClusterConfig{
  48. clusterSize: 3,
  49. clientTLS: clientTLS,
  50. initialToken: "new",
  51. }
  52. configClientBoth = etcdProcessClusterConfig{
  53. clusterSize: 1,
  54. clientTLS: clientTLSAndNonTLS,
  55. initialToken: "new",
  56. }
  57. configClientAutoTLS = etcdProcessClusterConfig{
  58. clusterSize: 1,
  59. isClientAutoTLS: true,
  60. clientTLS: clientTLS,
  61. initialToken: "new",
  62. }
  63. configPeerTLS = etcdProcessClusterConfig{
  64. clusterSize: 3,
  65. isPeerTLS: true,
  66. initialToken: "new",
  67. }
  68. configClientTLSCertAuth = etcdProcessClusterConfig{
  69. clusterSize: 1,
  70. clientTLS: clientTLS,
  71. initialToken: "new",
  72. clientCertAuthEnabled: true,
  73. }
  74. configJWT = etcdProcessClusterConfig{
  75. clusterSize: 1,
  76. initialToken: "new",
  77. authTokenOpts: "jwt,pub-key=../../integration/fixtures/server.crt,priv-key=../../integration/fixtures/server.key.insecure,sign-method=RS256,ttl=1s",
  78. }
  79. )
  80. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  81. ret := cfg
  82. ret.clusterSize = 1
  83. return &ret
  84. }
  85. type etcdProcessCluster struct {
  86. cfg *etcdProcessClusterConfig
  87. procs []etcdProcess
  88. }
  89. type etcdProcessClusterConfig struct {
  90. execPath string
  91. dataDirPath string
  92. keepDataDir bool
  93. clusterSize int
  94. baseScheme string
  95. basePort int
  96. metricsURLScheme string
  97. snapCount int // default is 10000
  98. clientTLS clientConnType
  99. clientCertAuthEnabled bool
  100. isPeerTLS bool
  101. isPeerAutoTLS bool
  102. isClientAutoTLS bool
  103. isClientCRL bool
  104. forceNewCluster bool
  105. initialToken string
  106. quotaBackendBytes int64
  107. noStrictReconfig bool
  108. initialCorruptCheck bool
  109. authTokenOpts string
  110. }
  111. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  112. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  113. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  114. etcdCfgs := cfg.etcdServerProcessConfigs()
  115. epc := &etcdProcessCluster{
  116. cfg: cfg,
  117. procs: make([]etcdProcess, cfg.clusterSize),
  118. }
  119. // launch etcd processes
  120. for i := range etcdCfgs {
  121. proc, err := newEtcdProcess(etcdCfgs[i])
  122. if err != nil {
  123. epc.Close()
  124. return nil, err
  125. }
  126. epc.procs[i] = proc
  127. }
  128. if err := epc.Start(); err != nil {
  129. return nil, err
  130. }
  131. return epc, nil
  132. }
  133. func (cfg *etcdProcessClusterConfig) clientScheme() string {
  134. if cfg.clientTLS == clientTLS {
  135. return "https"
  136. }
  137. return "http"
  138. }
  139. func (cfg *etcdProcessClusterConfig) peerScheme() string {
  140. peerScheme := cfg.baseScheme
  141. if peerScheme == "" {
  142. peerScheme = "http"
  143. }
  144. if cfg.isPeerTLS {
  145. peerScheme += "s"
  146. }
  147. return peerScheme
  148. }
  149. func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
  150. if cfg.basePort == 0 {
  151. cfg.basePort = etcdProcessBasePort
  152. }
  153. if cfg.execPath == "" {
  154. cfg.execPath = binPath
  155. }
  156. if cfg.snapCount == 0 {
  157. cfg.snapCount = etcdserver.DefaultSnapCount
  158. }
  159. etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
  160. initialCluster := make([]string, cfg.clusterSize)
  161. for i := 0; i < cfg.clusterSize; i++ {
  162. var curls []string
  163. var curl, curltls string
  164. port := cfg.basePort + 5*i
  165. curlHost := fmt.Sprintf("localhost:%d", port)
  166. switch cfg.clientTLS {
  167. case clientNonTLS, clientTLS:
  168. curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
  169. curls = []string{curl}
  170. case clientTLSAndNonTLS:
  171. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  172. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  173. curls = []string{curl, curltls}
  174. }
  175. purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
  176. name := fmt.Sprintf("testname%d", i)
  177. dataDirPath := cfg.dataDirPath
  178. if cfg.dataDirPath == "" {
  179. var derr error
  180. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  181. if derr != nil {
  182. panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
  183. }
  184. }
  185. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  186. args := []string{
  187. "--name", name,
  188. "--listen-client-urls", strings.Join(curls, ","),
  189. "--advertise-client-urls", strings.Join(curls, ","),
  190. "--listen-peer-urls", purl.String(),
  191. "--initial-advertise-peer-urls", purl.String(),
  192. "--initial-cluster-token", cfg.initialToken,
  193. "--data-dir", dataDirPath,
  194. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  195. }
  196. args = addV2Args(args)
  197. if cfg.forceNewCluster {
  198. args = append(args, "--force-new-cluster")
  199. }
  200. if cfg.quotaBackendBytes > 0 {
  201. args = append(args,
  202. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  203. )
  204. }
  205. if cfg.noStrictReconfig {
  206. args = append(args, "--strict-reconfig-check=false")
  207. }
  208. if cfg.initialCorruptCheck {
  209. args = append(args, "--experimental-initial-corrupt-check")
  210. }
  211. var murl string
  212. if cfg.metricsURLScheme != "" {
  213. murl = (&url.URL{
  214. Scheme: cfg.metricsURLScheme,
  215. Host: fmt.Sprintf("localhost:%d", port+2),
  216. }).String()
  217. args = append(args, "--listen-metrics-urls", murl)
  218. }
  219. args = append(args, cfg.tlsArgs()...)
  220. if cfg.authTokenOpts != "" {
  221. args = append(args, "--auth-token", cfg.authTokenOpts)
  222. }
  223. etcdCfgs[i] = &etcdServerProcessConfig{
  224. execPath: cfg.execPath,
  225. args: args,
  226. tlsArgs: cfg.tlsArgs(),
  227. dataDirPath: dataDirPath,
  228. keepDataDir: cfg.keepDataDir,
  229. name: name,
  230. purl: purl,
  231. acurl: curl,
  232. murl: murl,
  233. initialToken: cfg.initialToken,
  234. }
  235. }
  236. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  237. for i := range etcdCfgs {
  238. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  239. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  240. }
  241. return etcdCfgs
  242. }
  243. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  244. if cfg.clientTLS != clientNonTLS {
  245. if cfg.isClientAutoTLS {
  246. args = append(args, "--auto-tls")
  247. } else {
  248. tlsClientArgs := []string{
  249. "--cert-file", certPath,
  250. "--key-file", privateKeyPath,
  251. "--trusted-ca-file", caPath,
  252. }
  253. args = append(args, tlsClientArgs...)
  254. if cfg.clientCertAuthEnabled {
  255. args = append(args, "--client-cert-auth")
  256. }
  257. }
  258. }
  259. if cfg.isPeerTLS {
  260. if cfg.isPeerAutoTLS {
  261. args = append(args, "--peer-auto-tls")
  262. } else {
  263. tlsPeerArgs := []string{
  264. "--peer-cert-file", certPath,
  265. "--peer-key-file", privateKeyPath,
  266. "--peer-trusted-ca-file", caPath,
  267. }
  268. args = append(args, tlsPeerArgs...)
  269. }
  270. }
  271. if cfg.isClientCRL {
  272. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  273. }
  274. return args
  275. }
  276. func (epc *etcdProcessCluster) EndpointsV2() []string {
  277. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
  278. }
  279. func (epc *etcdProcessCluster) EndpointsV3() []string {
  280. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
  281. }
  282. func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
  283. for _, p := range epc.procs {
  284. ret = append(ret, f(p)...)
  285. }
  286. return ret
  287. }
  288. func (epc *etcdProcessCluster) Start() error {
  289. return epc.start(func(ep etcdProcess) error { return ep.Start() })
  290. }
  291. func (epc *etcdProcessCluster) Restart() error {
  292. return epc.start(func(ep etcdProcess) error { return ep.Restart() })
  293. }
  294. func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
  295. readyC := make(chan error, len(epc.procs))
  296. for i := range epc.procs {
  297. go func(n int) { readyC <- f(epc.procs[n]) }(i)
  298. }
  299. for range epc.procs {
  300. if err := <-readyC; err != nil {
  301. epc.Close()
  302. return err
  303. }
  304. }
  305. return nil
  306. }
  307. func (epc *etcdProcessCluster) Stop() (err error) {
  308. for _, p := range epc.procs {
  309. if p == nil {
  310. continue
  311. }
  312. if curErr := p.Stop(); curErr != nil {
  313. if err != nil {
  314. err = fmt.Errorf("%v; %v", err, curErr)
  315. } else {
  316. err = curErr
  317. }
  318. }
  319. }
  320. return err
  321. }
  322. func (epc *etcdProcessCluster) Close() error {
  323. err := epc.Stop()
  324. for _, p := range epc.procs {
  325. // p is nil when newEtcdProcess fails in the middle
  326. // Close still gets called to clean up test data
  327. if p == nil {
  328. continue
  329. }
  330. if cerr := p.Close(); cerr != nil {
  331. err = cerr
  332. }
  333. }
  334. return err
  335. }
  336. func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
  337. for _, p := range epc.procs {
  338. ret = p.WithStopSignal(sig)
  339. }
  340. return ret
  341. }