cluster_test.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/etcdserver"
  22. )
  23. const etcdProcessBasePort = 20000
  24. type clientConnType int
  25. const (
  26. clientNonTLS clientConnType = iota
  27. clientTLS
  28. clientTLSAndNonTLS
  29. )
  30. var (
  31. configNoTLS = etcdProcessClusterConfig{
  32. clusterSize: 3,
  33. initialToken: "new",
  34. }
  35. configAutoTLS = etcdProcessClusterConfig{
  36. clusterSize: 3,
  37. isPeerTLS: true,
  38. isPeerAutoTLS: true,
  39. initialToken: "new",
  40. }
  41. configTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. clientTLS: clientTLS,
  44. isPeerTLS: true,
  45. initialToken: "new",
  46. }
  47. configClientTLS = etcdProcessClusterConfig{
  48. clusterSize: 3,
  49. clientTLS: clientTLS,
  50. initialToken: "new",
  51. }
  52. configClientBoth = etcdProcessClusterConfig{
  53. clusterSize: 1,
  54. clientTLS: clientTLSAndNonTLS,
  55. initialToken: "new",
  56. }
  57. configClientAutoTLS = etcdProcessClusterConfig{
  58. clusterSize: 1,
  59. isClientAutoTLS: true,
  60. clientTLS: clientTLS,
  61. initialToken: "new",
  62. }
  63. configPeerTLS = etcdProcessClusterConfig{
  64. clusterSize: 3,
  65. isPeerTLS: true,
  66. initialToken: "new",
  67. }
  68. configClientTLSCertAuth = etcdProcessClusterConfig{
  69. clusterSize: 1,
  70. clientTLS: clientTLS,
  71. initialToken: "new",
  72. clientCertAuthEnabled: true,
  73. }
  74. )
  75. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  76. ret := cfg
  77. ret.clusterSize = 1
  78. return &ret
  79. }
  80. type etcdProcessCluster struct {
  81. cfg *etcdProcessClusterConfig
  82. procs []etcdProcess
  83. }
  84. type etcdProcessClusterConfig struct {
  85. execPath string
  86. dataDirPath string
  87. keepDataDir bool
  88. clusterSize int
  89. baseScheme string
  90. basePort int
  91. metricsURLScheme string
  92. snapCount int // default is 10000
  93. clientTLS clientConnType
  94. clientCertAuthEnabled bool
  95. isPeerTLS bool
  96. isPeerAutoTLS bool
  97. isClientAutoTLS bool
  98. isClientCRL bool
  99. forceNewCluster bool
  100. initialToken string
  101. quotaBackendBytes int64
  102. noStrictReconfig bool
  103. initialCorruptCheck bool
  104. }
  105. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  106. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  107. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  108. etcdCfgs := cfg.etcdServerProcessConfigs()
  109. epc := &etcdProcessCluster{
  110. cfg: cfg,
  111. procs: make([]etcdProcess, cfg.clusterSize),
  112. }
  113. // launch etcd processes
  114. for i := range etcdCfgs {
  115. proc, err := newEtcdProcess(etcdCfgs[i])
  116. if err != nil {
  117. epc.Close()
  118. return nil, err
  119. }
  120. epc.procs[i] = proc
  121. }
  122. if err := epc.Start(); err != nil {
  123. return nil, err
  124. }
  125. return epc, nil
  126. }
  127. func (cfg *etcdProcessClusterConfig) clientScheme() string {
  128. if cfg.clientTLS == clientTLS {
  129. return "https"
  130. }
  131. return "http"
  132. }
  133. func (cfg *etcdProcessClusterConfig) peerScheme() string {
  134. peerScheme := cfg.baseScheme
  135. if peerScheme == "" {
  136. peerScheme = "http"
  137. }
  138. if cfg.isPeerTLS {
  139. peerScheme += "s"
  140. }
  141. return peerScheme
  142. }
  143. func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
  144. if cfg.basePort == 0 {
  145. cfg.basePort = etcdProcessBasePort
  146. }
  147. if cfg.execPath == "" {
  148. cfg.execPath = binPath
  149. }
  150. if cfg.snapCount == 0 {
  151. cfg.snapCount = etcdserver.DefaultSnapCount
  152. }
  153. etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
  154. initialCluster := make([]string, cfg.clusterSize)
  155. for i := 0; i < cfg.clusterSize; i++ {
  156. var curls []string
  157. var curl, curltls string
  158. port := cfg.basePort + 5*i
  159. curlHost := fmt.Sprintf("localhost:%d", port)
  160. switch cfg.clientTLS {
  161. case clientNonTLS, clientTLS:
  162. curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
  163. curls = []string{curl}
  164. case clientTLSAndNonTLS:
  165. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  166. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  167. curls = []string{curl, curltls}
  168. }
  169. purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
  170. name := fmt.Sprintf("testname%d", i)
  171. dataDirPath := cfg.dataDirPath
  172. if cfg.dataDirPath == "" {
  173. var derr error
  174. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  175. if derr != nil {
  176. panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
  177. }
  178. }
  179. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  180. args := []string{
  181. "--name", name,
  182. "--listen-client-urls", strings.Join(curls, ","),
  183. "--advertise-client-urls", strings.Join(curls, ","),
  184. "--listen-peer-urls", purl.String(),
  185. "--initial-advertise-peer-urls", purl.String(),
  186. "--initial-cluster-token", cfg.initialToken,
  187. "--data-dir", dataDirPath,
  188. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  189. }
  190. args = addV2Args(args)
  191. if cfg.forceNewCluster {
  192. args = append(args, "--force-new-cluster")
  193. }
  194. if cfg.quotaBackendBytes > 0 {
  195. args = append(args,
  196. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  197. )
  198. }
  199. if cfg.noStrictReconfig {
  200. args = append(args, "--strict-reconfig-check=false")
  201. }
  202. if cfg.initialCorruptCheck {
  203. args = append(args, "--experimental-initial-corrupt-check")
  204. }
  205. var murl string
  206. if cfg.metricsURLScheme != "" {
  207. murl = (&url.URL{
  208. Scheme: cfg.metricsURLScheme,
  209. Host: fmt.Sprintf("localhost:%d", port+2),
  210. }).String()
  211. args = append(args, "--listen-metrics-urls", murl)
  212. }
  213. args = append(args, cfg.tlsArgs()...)
  214. etcdCfgs[i] = &etcdServerProcessConfig{
  215. execPath: cfg.execPath,
  216. args: args,
  217. tlsArgs: cfg.tlsArgs(),
  218. dataDirPath: dataDirPath,
  219. keepDataDir: cfg.keepDataDir,
  220. name: name,
  221. purl: purl,
  222. acurl: curl,
  223. murl: murl,
  224. initialToken: cfg.initialToken,
  225. }
  226. }
  227. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  228. for i := range etcdCfgs {
  229. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  230. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  231. }
  232. return etcdCfgs
  233. }
  234. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  235. if cfg.clientTLS != clientNonTLS {
  236. if cfg.isClientAutoTLS {
  237. args = append(args, "--auto-tls")
  238. } else {
  239. tlsClientArgs := []string{
  240. "--cert-file", certPath,
  241. "--key-file", privateKeyPath,
  242. "--ca-file", caPath,
  243. }
  244. args = append(args, tlsClientArgs...)
  245. if cfg.clientCertAuthEnabled {
  246. args = append(args, "--client-cert-auth")
  247. }
  248. }
  249. }
  250. if cfg.isPeerTLS {
  251. if cfg.isPeerAutoTLS {
  252. args = append(args, "--peer-auto-tls")
  253. } else {
  254. tlsPeerArgs := []string{
  255. "--peer-cert-file", certPath,
  256. "--peer-key-file", privateKeyPath,
  257. "--peer-ca-file", caPath,
  258. }
  259. args = append(args, tlsPeerArgs...)
  260. }
  261. }
  262. if cfg.isClientCRL {
  263. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  264. }
  265. return args
  266. }
  267. func (epc *etcdProcessCluster) EndpointsV2() []string {
  268. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
  269. }
  270. func (epc *etcdProcessCluster) EndpointsV3() []string {
  271. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
  272. }
  273. func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
  274. for _, p := range epc.procs {
  275. ret = append(ret, f(p)...)
  276. }
  277. return ret
  278. }
  279. func (epc *etcdProcessCluster) Start() error {
  280. return epc.start(func(ep etcdProcess) error { return ep.Start() })
  281. }
  282. func (epc *etcdProcessCluster) Restart() error {
  283. return epc.start(func(ep etcdProcess) error { return ep.Restart() })
  284. }
  285. func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
  286. readyC := make(chan error, len(epc.procs))
  287. for i := range epc.procs {
  288. go func(n int) { readyC <- f(epc.procs[n]) }(i)
  289. }
  290. for range epc.procs {
  291. if err := <-readyC; err != nil {
  292. epc.Close()
  293. return err
  294. }
  295. }
  296. return nil
  297. }
  298. func (epc *etcdProcessCluster) Stop() (err error) {
  299. for _, p := range epc.procs {
  300. if p == nil {
  301. continue
  302. }
  303. if curErr := p.Stop(); curErr != nil {
  304. if err != nil {
  305. err = fmt.Errorf("%v; %v", err, curErr)
  306. } else {
  307. err = curErr
  308. }
  309. }
  310. }
  311. return err
  312. }
  313. func (epc *etcdProcessCluster) Close() error {
  314. err := epc.Stop()
  315. for _, p := range epc.procs {
  316. // p is nil when newEtcdProcess fails in the middle
  317. // Close still gets called to clean up test data
  318. if p == nil {
  319. continue
  320. }
  321. if cerr := p.Close(); cerr != nil {
  322. err = cerr
  323. }
  324. }
  325. return err
  326. }
  327. func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
  328. for _, p := range epc.procs {
  329. ret = p.WithStopSignal(sig)
  330. }
  331. return ret
  332. }