etcd_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/pkg/expect"
  22. "github.com/coreos/etcd/pkg/fileutil"
  23. )
  24. const (
  25. etcdProcessBasePort = 20000
  26. certPath = "../integration/fixtures/server.crt"
  27. privateKeyPath = "../integration/fixtures/server.key.insecure"
  28. caPath = "../integration/fixtures/ca.crt"
  29. )
  30. type clientConnType int
  31. const (
  32. clientNonTLS clientConnType = iota
  33. clientTLS
  34. clientTLSAndNonTLS
  35. )
  36. var (
  37. configNoTLS = etcdProcessClusterConfig{
  38. clusterSize: 3,
  39. proxySize: 0,
  40. initialToken: "new",
  41. }
  42. configAutoTLS = etcdProcessClusterConfig{
  43. clusterSize: 3,
  44. isPeerTLS: true,
  45. isPeerAutoTLS: true,
  46. initialToken: "new",
  47. }
  48. configTLS = etcdProcessClusterConfig{
  49. clusterSize: 3,
  50. proxySize: 0,
  51. clientTLS: clientTLS,
  52. isPeerTLS: true,
  53. initialToken: "new",
  54. }
  55. configClientTLS = etcdProcessClusterConfig{
  56. clusterSize: 3,
  57. proxySize: 0,
  58. clientTLS: clientTLS,
  59. initialToken: "new",
  60. }
  61. configClientBoth = etcdProcessClusterConfig{
  62. clusterSize: 1,
  63. proxySize: 0,
  64. clientTLS: clientTLSAndNonTLS,
  65. initialToken: "new",
  66. }
  67. configClientAutoTLS = etcdProcessClusterConfig{
  68. clusterSize: 1,
  69. proxySize: 0,
  70. isClientAutoTLS: true,
  71. clientTLS: clientTLS,
  72. initialToken: "new",
  73. }
  74. configPeerTLS = etcdProcessClusterConfig{
  75. clusterSize: 3,
  76. proxySize: 0,
  77. isPeerTLS: true,
  78. initialToken: "new",
  79. }
  80. configWithProxy = etcdProcessClusterConfig{
  81. clusterSize: 3,
  82. proxySize: 1,
  83. initialToken: "new",
  84. }
  85. configWithProxyTLS = etcdProcessClusterConfig{
  86. clusterSize: 3,
  87. proxySize: 1,
  88. clientTLS: clientTLS,
  89. isPeerTLS: true,
  90. initialToken: "new",
  91. }
  92. configWithProxyPeerTLS = etcdProcessClusterConfig{
  93. clusterSize: 3,
  94. proxySize: 1,
  95. isPeerTLS: true,
  96. initialToken: "new",
  97. }
  98. )
  99. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  100. ret := cfg
  101. ret.clusterSize = 1
  102. return &ret
  103. }
  104. type etcdProcessCluster struct {
  105. cfg *etcdProcessClusterConfig
  106. procs []*etcdProcess
  107. }
  108. type etcdProcess struct {
  109. cfg *etcdProcessConfig
  110. proc *expect.ExpectProcess
  111. donec chan struct{} // closed when Interact() terminates
  112. }
  113. type etcdProcessConfig struct {
  114. args []string
  115. dataDirPath string
  116. keepDataDir bool
  117. acurl string
  118. // additional url for tls connection when the etcd process
  119. // serves both http and https
  120. acurltls string
  121. acurlHost string
  122. isProxy bool
  123. }
  124. type etcdProcessClusterConfig struct {
  125. dataDirPathPrefix string
  126. keepDataDir bool
  127. clusterSize int
  128. basePort int
  129. proxySize int
  130. clientTLS clientConnType
  131. isPeerTLS bool
  132. isPeerAutoTLS bool
  133. isClientAutoTLS bool
  134. forceNewCluster bool
  135. initialToken string
  136. quotaBackendBytes int64
  137. }
  138. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  139. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  140. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  141. etcdCfgs := cfg.etcdProcessConfigs()
  142. epc := &etcdProcessCluster{
  143. cfg: cfg,
  144. procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
  145. }
  146. // launch etcd processes
  147. for i := range etcdCfgs {
  148. proc, err := newEtcdProcess(etcdCfgs[i])
  149. if err != nil {
  150. epc.Close()
  151. return nil, err
  152. }
  153. epc.procs[i] = proc
  154. }
  155. return epc, epc.Start()
  156. }
  157. func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
  158. if !fileutil.Exist("../bin/etcd") {
  159. return nil, fmt.Errorf("could not find etcd binary")
  160. }
  161. if !cfg.keepDataDir {
  162. if err := os.RemoveAll(cfg.dataDirPath); err != nil {
  163. return nil, err
  164. }
  165. }
  166. child, err := spawnCmd(append([]string{"../bin/etcd"}, cfg.args...))
  167. if err != nil {
  168. return nil, err
  169. }
  170. return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
  171. }
  172. func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
  173. if cfg.basePort == 0 {
  174. cfg.basePort = etcdProcessBasePort
  175. }
  176. clientScheme := "http"
  177. if cfg.clientTLS == clientTLS {
  178. clientScheme = "https"
  179. }
  180. peerScheme := "http"
  181. if cfg.isPeerTLS {
  182. peerScheme = "https"
  183. }
  184. etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
  185. initialCluster := make([]string, cfg.clusterSize)
  186. for i := 0; i < cfg.clusterSize; i++ {
  187. var curls []string
  188. var curl, curltls string
  189. port := cfg.basePort + 2*i
  190. curlHost := fmt.Sprintf("localhost:%d", port)
  191. switch cfg.clientTLS {
  192. case clientNonTLS, clientTLS:
  193. curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
  194. curls = []string{curl}
  195. case clientTLSAndNonTLS:
  196. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  197. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  198. curls = []string{curl, curltls}
  199. }
  200. purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
  201. name := fmt.Sprintf("testname%d", i)
  202. var dataDirPath string
  203. if cfg.dataDirPathPrefix != "" {
  204. dataDirPath = fmt.Sprintf("%s%d.etcd", cfg.dataDirPathPrefix, i)
  205. } else {
  206. var derr error
  207. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  208. if derr != nil {
  209. panic("could not get tempdir for datadir")
  210. }
  211. }
  212. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  213. args := []string{
  214. "--name", name,
  215. "--listen-client-urls", strings.Join(curls, ","),
  216. "--advertise-client-urls", strings.Join(curls, ","),
  217. "--listen-peer-urls", purl.String(),
  218. "--initial-advertise-peer-urls", purl.String(),
  219. "--initial-cluster-token", cfg.initialToken,
  220. "--data-dir", dataDirPath,
  221. }
  222. if cfg.forceNewCluster {
  223. args = append(args, "--force-new-cluster")
  224. }
  225. if cfg.quotaBackendBytes > 0 {
  226. args = append(args,
  227. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  228. )
  229. }
  230. args = append(args, cfg.tlsArgs()...)
  231. etcdCfgs[i] = &etcdProcessConfig{
  232. args: args,
  233. dataDirPath: dataDirPath,
  234. keepDataDir: cfg.keepDataDir,
  235. acurl: curl,
  236. acurltls: curltls,
  237. acurlHost: curlHost,
  238. }
  239. }
  240. for i := 0; i < cfg.proxySize; i++ {
  241. port := cfg.basePort + 2*cfg.clusterSize + i + 1
  242. curlHost := fmt.Sprintf("localhost:%d", port)
  243. curl := url.URL{Scheme: clientScheme, Host: curlHost}
  244. name := fmt.Sprintf("testname-proxy%d", i)
  245. dataDirPath, derr := ioutil.TempDir("", name+".etcd")
  246. if derr != nil {
  247. panic("could not get tempdir for datadir")
  248. }
  249. args := []string{
  250. "--name", name,
  251. "--proxy", "on",
  252. "--listen-client-urls", curl.String(),
  253. "--data-dir", dataDirPath,
  254. }
  255. args = append(args, cfg.tlsArgs()...)
  256. etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
  257. args: args,
  258. dataDirPath: dataDirPath,
  259. keepDataDir: cfg.keepDataDir,
  260. acurl: curl.String(),
  261. acurlHost: curlHost,
  262. isProxy: true,
  263. }
  264. }
  265. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  266. for i := range etcdCfgs {
  267. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  268. }
  269. return etcdCfgs
  270. }
  271. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  272. if cfg.clientTLS != clientNonTLS {
  273. if cfg.isClientAutoTLS {
  274. args = append(args, "--auto-tls=true")
  275. } else {
  276. tlsClientArgs := []string{
  277. "--cert-file", certPath,
  278. "--key-file", privateKeyPath,
  279. "--ca-file", caPath,
  280. }
  281. args = append(args, tlsClientArgs...)
  282. }
  283. }
  284. if cfg.isPeerTLS {
  285. if cfg.isPeerAutoTLS {
  286. args = append(args, "--peer-auto-tls=true")
  287. } else {
  288. tlsPeerArgs := []string{
  289. "--peer-cert-file", certPath,
  290. "--peer-key-file", privateKeyPath,
  291. "--peer-ca-file", caPath,
  292. }
  293. args = append(args, tlsPeerArgs...)
  294. }
  295. }
  296. return args
  297. }
  298. func (epc *etcdProcessCluster) Start() (err error) {
  299. readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
  300. readyStr := "enabled capabilities for version"
  301. for i := range epc.procs {
  302. go func(etcdp *etcdProcess) {
  303. etcdp.donec = make(chan struct{})
  304. rs := readyStr
  305. if etcdp.cfg.isProxy {
  306. rs = "proxy: endpoints found"
  307. }
  308. _, err := etcdp.proc.Expect(rs)
  309. readyC <- err
  310. close(etcdp.donec)
  311. }(epc.procs[i])
  312. }
  313. for range epc.procs {
  314. if err := <-readyC; err != nil {
  315. epc.Close()
  316. return err
  317. }
  318. }
  319. return nil
  320. }
  321. func (epc *etcdProcessCluster) Restart() error {
  322. for i := range epc.procs {
  323. proc, err := newEtcdProcess(epc.procs[i].cfg)
  324. if err != nil {
  325. epc.Close()
  326. return err
  327. }
  328. epc.procs[i] = proc
  329. }
  330. return epc.Start()
  331. }
  332. func (epc *etcdProcessCluster) Stop() (err error) {
  333. for _, p := range epc.procs {
  334. if p == nil {
  335. continue
  336. }
  337. if curErr := p.proc.Stop(); curErr != nil {
  338. if err != nil {
  339. err = fmt.Errorf("%v; %v", err, curErr)
  340. } else {
  341. err = curErr
  342. }
  343. }
  344. <-p.donec
  345. }
  346. return err
  347. }
  348. func (epc *etcdProcessCluster) Close() error {
  349. err := epc.Stop()
  350. for _, p := range epc.procs {
  351. os.RemoveAll(p.cfg.dataDirPath)
  352. }
  353. return err
  354. }
  355. func spawnCmd(args []string) (*expect.ExpectProcess, error) {
  356. return expect.NewExpect(args[0], args[1:]...)
  357. }
  358. func spawnWithExpect(args []string, expected string) error {
  359. return spawnWithExpects(args, []string{expected}...)
  360. }
  361. func spawnWithExpects(args []string, xs ...string) error {
  362. proc, err := spawnCmd(args)
  363. if err != nil {
  364. return err
  365. }
  366. // process until either stdout or stderr contains
  367. // the expected string
  368. var (
  369. lines []string
  370. lineFunc = func(txt string) bool { return true }
  371. )
  372. for _, txt := range xs {
  373. for {
  374. l, err := proc.ExpectFunc(lineFunc)
  375. if err != nil {
  376. return fmt.Errorf("%v (expected %q, got %q)", err, txt, lines)
  377. }
  378. lines = append(lines, l)
  379. if strings.Contains(l, txt) {
  380. break
  381. }
  382. }
  383. }
  384. perr := proc.Close()
  385. if err != nil {
  386. return err
  387. }
  388. if len(xs) == 0 && proc.LineCount() != 0 { // expect no output
  389. return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
  390. }
  391. return perr
  392. }
  393. // proxies returns only the proxy etcdProcess.
  394. func (epc *etcdProcessCluster) proxies() []*etcdProcess {
  395. return epc.procs[epc.cfg.clusterSize:]
  396. }
  397. func (epc *etcdProcessCluster) backends() []*etcdProcess {
  398. return epc.procs[:epc.cfg.clusterSize]
  399. }
  400. func (epc *etcdProcessCluster) endpoints() []string {
  401. eps := make([]string, epc.cfg.clusterSize)
  402. for i, ep := range epc.backends() {
  403. eps[i] = ep.cfg.acurl
  404. }
  405. return eps
  406. }
  407. func (epc *etcdProcessCluster) grpcEndpoints() []string {
  408. eps := make([]string, epc.cfg.clusterSize)
  409. for i, ep := range epc.backends() {
  410. eps[i] = ep.cfg.acurlHost
  411. }
  412. return eps
  413. }