cluster_test.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/etcdserver"
  22. )
  23. const etcdProcessBasePort = 20000
  24. type clientConnType int
  25. const (
  26. clientNonTLS clientConnType = iota
  27. clientTLS
  28. clientTLSAndNonTLS
  29. )
  30. var (
  31. configNoTLS = etcdProcessClusterConfig{
  32. clusterSize: 3,
  33. initialToken: "new",
  34. }
  35. configAutoTLS = etcdProcessClusterConfig{
  36. clusterSize: 3,
  37. isPeerTLS: true,
  38. isPeerAutoTLS: true,
  39. initialToken: "new",
  40. }
  41. configTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. clientTLS: clientTLS,
  44. isPeerTLS: true,
  45. initialToken: "new",
  46. }
  47. configClientTLS = etcdProcessClusterConfig{
  48. clusterSize: 3,
  49. clientTLS: clientTLS,
  50. initialToken: "new",
  51. }
  52. configClientBoth = etcdProcessClusterConfig{
  53. clusterSize: 1,
  54. clientTLS: clientTLSAndNonTLS,
  55. initialToken: "new",
  56. }
  57. configClientAutoTLS = etcdProcessClusterConfig{
  58. clusterSize: 1,
  59. isClientAutoTLS: true,
  60. clientTLS: clientTLS,
  61. initialToken: "new",
  62. }
  63. configPeerTLS = etcdProcessClusterConfig{
  64. clusterSize: 3,
  65. isPeerTLS: true,
  66. initialToken: "new",
  67. }
  68. configClientTLSCertAuth = etcdProcessClusterConfig{
  69. clusterSize: 1,
  70. clientTLS: clientTLS,
  71. initialToken: "new",
  72. clientCertAuthEnabled: true,
  73. }
  74. )
  75. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  76. ret := cfg
  77. ret.clusterSize = 1
  78. return &ret
  79. }
  80. type etcdProcessCluster struct {
  81. cfg *etcdProcessClusterConfig
  82. procs []etcdProcess
  83. }
  84. type etcdProcessClusterConfig struct {
  85. execPath string
  86. dataDirPath string
  87. keepDataDir bool
  88. clusterSize int
  89. baseScheme string
  90. basePort int
  91. snapCount int // default is 10000
  92. clientTLS clientConnType
  93. clientCertAuthEnabled bool
  94. isPeerTLS bool
  95. isPeerAutoTLS bool
  96. isClientAutoTLS bool
  97. isClientCRL bool
  98. forceNewCluster bool
  99. initialToken string
  100. quotaBackendBytes int64
  101. noStrictReconfig bool
  102. }
  103. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  104. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  105. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  106. etcdCfgs := cfg.etcdServerProcessConfigs()
  107. epc := &etcdProcessCluster{
  108. cfg: cfg,
  109. procs: make([]etcdProcess, cfg.clusterSize),
  110. }
  111. // launch etcd processes
  112. for i := range etcdCfgs {
  113. proc, err := newEtcdProcess(etcdCfgs[i])
  114. if err != nil {
  115. epc.Close()
  116. return nil, err
  117. }
  118. epc.procs[i] = proc
  119. }
  120. if err := epc.Start(); err != nil {
  121. return nil, err
  122. }
  123. return epc, nil
  124. }
  125. func (cfg *etcdProcessClusterConfig) clientScheme() string {
  126. if cfg.clientTLS == clientTLS {
  127. return "https"
  128. }
  129. return "http"
  130. }
  131. func (cfg *etcdProcessClusterConfig) peerScheme() string {
  132. peerScheme := cfg.baseScheme
  133. if peerScheme == "" {
  134. peerScheme = "http"
  135. }
  136. if cfg.isPeerTLS {
  137. peerScheme += "s"
  138. }
  139. return peerScheme
  140. }
  141. func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
  142. if cfg.basePort == 0 {
  143. cfg.basePort = etcdProcessBasePort
  144. }
  145. if cfg.execPath == "" {
  146. cfg.execPath = binPath
  147. }
  148. if cfg.snapCount == 0 {
  149. cfg.snapCount = etcdserver.DefaultSnapCount
  150. }
  151. etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
  152. initialCluster := make([]string, cfg.clusterSize)
  153. for i := 0; i < cfg.clusterSize; i++ {
  154. var curls []string
  155. var curl, curltls string
  156. port := cfg.basePort + 4*i
  157. curlHost := fmt.Sprintf("localhost:%d", port)
  158. switch cfg.clientTLS {
  159. case clientNonTLS, clientTLS:
  160. curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
  161. curls = []string{curl}
  162. case clientTLSAndNonTLS:
  163. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  164. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  165. curls = []string{curl, curltls}
  166. }
  167. purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
  168. name := fmt.Sprintf("testname%d", i)
  169. dataDirPath := cfg.dataDirPath
  170. if cfg.dataDirPath == "" {
  171. var derr error
  172. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  173. if derr != nil {
  174. panic("could not get tempdir for datadir")
  175. }
  176. }
  177. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  178. args := []string{
  179. "--name", name,
  180. "--listen-client-urls", strings.Join(curls, ","),
  181. "--advertise-client-urls", strings.Join(curls, ","),
  182. "--listen-peer-urls", purl.String(),
  183. "--initial-advertise-peer-urls", purl.String(),
  184. "--initial-cluster-token", cfg.initialToken,
  185. "--data-dir", dataDirPath,
  186. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  187. }
  188. if cfg.forceNewCluster {
  189. args = append(args, "--force-new-cluster")
  190. }
  191. if cfg.quotaBackendBytes > 0 {
  192. args = append(args,
  193. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  194. )
  195. }
  196. if cfg.noStrictReconfig {
  197. args = append(args, "--strict-reconfig-check=false")
  198. }
  199. args = append(args, cfg.tlsArgs()...)
  200. etcdCfgs[i] = &etcdServerProcessConfig{
  201. execPath: cfg.execPath,
  202. args: args,
  203. tlsArgs: cfg.tlsArgs(),
  204. dataDirPath: dataDirPath,
  205. keepDataDir: cfg.keepDataDir,
  206. name: name,
  207. purl: purl,
  208. acurl: curl,
  209. initialToken: cfg.initialToken,
  210. }
  211. }
  212. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  213. for i := range etcdCfgs {
  214. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  215. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  216. }
  217. return etcdCfgs
  218. }
  219. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  220. if cfg.clientTLS != clientNonTLS {
  221. if cfg.isClientAutoTLS {
  222. args = append(args, "--auto-tls")
  223. } else {
  224. tlsClientArgs := []string{
  225. "--cert-file", certPath,
  226. "--key-file", privateKeyPath,
  227. "--ca-file", caPath,
  228. }
  229. args = append(args, tlsClientArgs...)
  230. if cfg.clientCertAuthEnabled {
  231. args = append(args, "--client-cert-auth")
  232. }
  233. }
  234. }
  235. if cfg.isPeerTLS {
  236. if cfg.isPeerAutoTLS {
  237. args = append(args, "--peer-auto-tls")
  238. } else {
  239. tlsPeerArgs := []string{
  240. "--peer-cert-file", certPath,
  241. "--peer-key-file", privateKeyPath,
  242. "--peer-ca-file", caPath,
  243. }
  244. args = append(args, tlsPeerArgs...)
  245. }
  246. }
  247. if cfg.isClientCRL {
  248. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  249. }
  250. return args
  251. }
  252. func (epc *etcdProcessCluster) EndpointsV2() []string {
  253. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
  254. }
  255. func (epc *etcdProcessCluster) EndpointsV3() []string {
  256. return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
  257. }
  258. func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
  259. for _, p := range epc.procs {
  260. ret = append(ret, f(p)...)
  261. }
  262. return ret
  263. }
  264. func (epc *etcdProcessCluster) Start() error {
  265. return epc.start(func(ep etcdProcess) error { return ep.Start() })
  266. }
  267. func (epc *etcdProcessCluster) Restart() error {
  268. return epc.start(func(ep etcdProcess) error { return ep.Restart() })
  269. }
  270. func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
  271. readyC := make(chan error, len(epc.procs))
  272. for i := range epc.procs {
  273. go func(n int) { readyC <- f(epc.procs[n]) }(i)
  274. }
  275. for range epc.procs {
  276. if err := <-readyC; err != nil {
  277. epc.Close()
  278. return err
  279. }
  280. }
  281. return nil
  282. }
  283. func (epc *etcdProcessCluster) Stop() (err error) {
  284. for _, p := range epc.procs {
  285. if p == nil {
  286. continue
  287. }
  288. if curErr := p.Stop(); curErr != nil {
  289. if err != nil {
  290. err = fmt.Errorf("%v; %v", err, curErr)
  291. } else {
  292. err = curErr
  293. }
  294. }
  295. }
  296. return err
  297. }
  298. func (epc *etcdProcessCluster) Close() error {
  299. err := epc.Stop()
  300. for _, p := range epc.procs {
  301. // p is nil when newEtcdProcess fails in the middle
  302. // Close still gets called to clean up test data
  303. if p == nil {
  304. continue
  305. }
  306. if cerr := p.Close(); cerr != nil {
  307. err = cerr
  308. }
  309. }
  310. return err
  311. }
  312. func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
  313. for _, p := range epc.procs {
  314. ret = p.WithStopSignal(sig)
  315. }
  316. return ret
  317. }