etcd_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "time"
  22. "github.com/coreos/etcd/etcdserver"
  23. "github.com/coreos/etcd/pkg/expect"
  24. "github.com/coreos/etcd/pkg/fileutil"
  25. )
  26. const etcdProcessBasePort = 20000
  27. var (
  28. binPath string
  29. ctlBinPath string
  30. certPath string
  31. privateKeyPath string
  32. caPath string
  33. )
  34. type clientConnType int
  35. const (
  36. clientNonTLS clientConnType = iota
  37. clientTLS
  38. clientTLSAndNonTLS
  39. )
  40. var (
  41. configNoTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. proxySize: 0,
  44. initialToken: "new",
  45. }
  46. configAutoTLS = etcdProcessClusterConfig{
  47. clusterSize: 3,
  48. isPeerTLS: true,
  49. isPeerAutoTLS: true,
  50. initialToken: "new",
  51. }
  52. configTLS = etcdProcessClusterConfig{
  53. clusterSize: 3,
  54. proxySize: 0,
  55. clientTLS: clientTLS,
  56. isPeerTLS: true,
  57. initialToken: "new",
  58. }
  59. configClientTLS = etcdProcessClusterConfig{
  60. clusterSize: 3,
  61. proxySize: 0,
  62. clientTLS: clientTLS,
  63. initialToken: "new",
  64. }
  65. configClientBoth = etcdProcessClusterConfig{
  66. clusterSize: 1,
  67. proxySize: 0,
  68. clientTLS: clientTLSAndNonTLS,
  69. initialToken: "new",
  70. }
  71. configClientAutoTLS = etcdProcessClusterConfig{
  72. clusterSize: 1,
  73. proxySize: 0,
  74. isClientAutoTLS: true,
  75. clientTLS: clientTLS,
  76. initialToken: "new",
  77. }
  78. configPeerTLS = etcdProcessClusterConfig{
  79. clusterSize: 3,
  80. proxySize: 0,
  81. isPeerTLS: true,
  82. initialToken: "new",
  83. }
  84. configWithProxy = etcdProcessClusterConfig{
  85. clusterSize: 3,
  86. proxySize: 1,
  87. initialToken: "new",
  88. }
  89. configWithProxyTLS = etcdProcessClusterConfig{
  90. clusterSize: 3,
  91. proxySize: 1,
  92. clientTLS: clientTLS,
  93. isPeerTLS: true,
  94. initialToken: "new",
  95. }
  96. configWithProxyPeerTLS = etcdProcessClusterConfig{
  97. clusterSize: 3,
  98. proxySize: 1,
  99. isPeerTLS: true,
  100. initialToken: "new",
  101. }
  102. configClientTLSCertAuth = etcdProcessClusterConfig{
  103. clusterSize: 1,
  104. proxySize: 0,
  105. clientTLS: clientTLS,
  106. initialToken: "new",
  107. clientCertAuthEnabled: true,
  108. }
  109. )
  110. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  111. ret := cfg
  112. ret.clusterSize = 1
  113. return &ret
  114. }
  115. type etcdProcessCluster struct {
  116. cfg *etcdProcessClusterConfig
  117. procs []*etcdProcess
  118. }
  119. type etcdProcess struct {
  120. cfg *etcdProcessConfig
  121. proc *expect.ExpectProcess
  122. donec chan struct{} // closed when Interact() terminates
  123. }
  124. type etcdProcessConfig struct {
  125. execPath string
  126. args []string
  127. dataDirPath string
  128. keepDataDir bool
  129. name string
  130. purl url.URL
  131. acurl string
  132. // additional url for tls connection when the etcd process
  133. // serves both http and https
  134. acurltls string
  135. acurlHost string
  136. initialToken string
  137. initialCluster string
  138. isProxy bool
  139. }
  140. type etcdProcessClusterConfig struct {
  141. execPath string
  142. dataDirPath string
  143. keepDataDir bool
  144. clusterSize int
  145. baseScheme string
  146. basePort int
  147. proxySize int
  148. snapCount int // default is 10000
  149. clientTLS clientConnType
  150. clientCertAuthEnabled bool
  151. isPeerTLS bool
  152. isPeerAutoTLS bool
  153. isClientAutoTLS bool
  154. forceNewCluster bool
  155. initialToken string
  156. quotaBackendBytes int64
  157. noStrictReconfig bool
  158. cipherSuites []string
  159. }
  160. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  161. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  162. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  163. etcdCfgs := cfg.etcdProcessConfigs()
  164. epc := &etcdProcessCluster{
  165. cfg: cfg,
  166. procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
  167. }
  168. // launch etcd processes
  169. for i := range etcdCfgs {
  170. proc, err := newEtcdProcess(etcdCfgs[i])
  171. if err != nil {
  172. epc.Close()
  173. return nil, err
  174. }
  175. epc.procs[i] = proc
  176. }
  177. return epc, epc.Start()
  178. }
  179. func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
  180. if !fileutil.Exist(cfg.execPath) {
  181. return nil, fmt.Errorf("could not find etcd binary")
  182. }
  183. if !cfg.keepDataDir {
  184. if err := os.RemoveAll(cfg.dataDirPath); err != nil {
  185. return nil, err
  186. }
  187. }
  188. child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...))
  189. if err != nil {
  190. return nil, err
  191. }
  192. return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
  193. }
  194. func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
  195. binPath = binDir + "/etcd"
  196. ctlBinPath = binDir + "/etcdctl"
  197. certPath = certDir + "/server.crt"
  198. privateKeyPath = certDir + "/server.key.insecure"
  199. caPath = certDir + "/ca.crt"
  200. if cfg.basePort == 0 {
  201. cfg.basePort = etcdProcessBasePort
  202. }
  203. if cfg.execPath == "" {
  204. cfg.execPath = binPath
  205. }
  206. if cfg.snapCount == 0 {
  207. cfg.snapCount = etcdserver.DefaultSnapCount
  208. }
  209. clientScheme := "http"
  210. if cfg.clientTLS == clientTLS {
  211. clientScheme = "https"
  212. }
  213. peerScheme := cfg.baseScheme
  214. if peerScheme == "" {
  215. peerScheme = "http"
  216. }
  217. if cfg.isPeerTLS {
  218. peerScheme += "s"
  219. }
  220. etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
  221. initialCluster := make([]string, cfg.clusterSize)
  222. for i := 0; i < cfg.clusterSize; i++ {
  223. var curls []string
  224. var curl, curltls string
  225. port := cfg.basePort + 2*i
  226. curlHost := fmt.Sprintf("localhost:%d", port)
  227. switch cfg.clientTLS {
  228. case clientNonTLS, clientTLS:
  229. curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
  230. curls = []string{curl}
  231. case clientTLSAndNonTLS:
  232. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  233. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  234. curls = []string{curl, curltls}
  235. }
  236. purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
  237. name := fmt.Sprintf("testname%d", i)
  238. dataDirPath := cfg.dataDirPath
  239. if cfg.dataDirPath == "" {
  240. var derr error
  241. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  242. if derr != nil {
  243. panic("could not get tempdir for datadir")
  244. }
  245. }
  246. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  247. args := []string{
  248. "--name", name,
  249. "--listen-client-urls", strings.Join(curls, ","),
  250. "--advertise-client-urls", strings.Join(curls, ","),
  251. "--listen-peer-urls", purl.String(),
  252. "--initial-advertise-peer-urls", purl.String(),
  253. "--initial-cluster-token", cfg.initialToken,
  254. "--data-dir", dataDirPath,
  255. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  256. }
  257. if cfg.forceNewCluster {
  258. args = append(args, "--force-new-cluster")
  259. }
  260. if cfg.quotaBackendBytes > 0 {
  261. args = append(args,
  262. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  263. )
  264. }
  265. if cfg.noStrictReconfig {
  266. args = append(args, "--strict-reconfig-check=false")
  267. }
  268. args = append(args, cfg.tlsArgs()...)
  269. etcdCfgs[i] = &etcdProcessConfig{
  270. execPath: cfg.execPath,
  271. args: args,
  272. dataDirPath: dataDirPath,
  273. keepDataDir: cfg.keepDataDir,
  274. name: name,
  275. purl: purl,
  276. acurl: curl,
  277. acurltls: curltls,
  278. acurlHost: curlHost,
  279. initialToken: cfg.initialToken,
  280. }
  281. }
  282. for i := 0; i < cfg.proxySize; i++ {
  283. port := cfg.basePort + 2*cfg.clusterSize + i + 1
  284. curlHost := fmt.Sprintf("localhost:%d", port)
  285. curl := url.URL{Scheme: clientScheme, Host: curlHost}
  286. name := fmt.Sprintf("testname-proxy%d", i)
  287. dataDirPath, derr := ioutil.TempDir("", name+".etcd")
  288. if derr != nil {
  289. panic("could not get tempdir for datadir")
  290. }
  291. args := []string{
  292. "--name", name,
  293. "--proxy", "on",
  294. "--listen-client-urls", curl.String(),
  295. "--data-dir", dataDirPath,
  296. }
  297. args = append(args, cfg.tlsArgs()...)
  298. etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
  299. execPath: cfg.execPath,
  300. args: args,
  301. dataDirPath: dataDirPath,
  302. keepDataDir: cfg.keepDataDir,
  303. name: name,
  304. acurl: curl.String(),
  305. acurlHost: curlHost,
  306. isProxy: true,
  307. }
  308. }
  309. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  310. for i := range etcdCfgs {
  311. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  312. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  313. }
  314. return etcdCfgs
  315. }
  316. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  317. if cfg.clientTLS != clientNonTLS {
  318. if cfg.isClientAutoTLS {
  319. args = append(args, "--auto-tls=true")
  320. } else {
  321. tlsClientArgs := []string{
  322. "--cert-file", certPath,
  323. "--key-file", privateKeyPath,
  324. "--ca-file", caPath,
  325. }
  326. args = append(args, tlsClientArgs...)
  327. if cfg.clientCertAuthEnabled {
  328. args = append(args, "--client-cert-auth")
  329. }
  330. }
  331. }
  332. if cfg.isPeerTLS {
  333. if cfg.isPeerAutoTLS {
  334. args = append(args, "--peer-auto-tls=true")
  335. } else {
  336. tlsPeerArgs := []string{
  337. "--peer-cert-file", certPath,
  338. "--peer-key-file", privateKeyPath,
  339. "--peer-ca-file", caPath,
  340. }
  341. args = append(args, tlsPeerArgs...)
  342. }
  343. }
  344. if len(cfg.cipherSuites) > 0 {
  345. args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ","))
  346. }
  347. return args
  348. }
  349. func (epc *etcdProcessCluster) Start() (err error) {
  350. readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
  351. for i := range epc.procs {
  352. go func(n int) { readyC <- epc.procs[n].waitReady() }(i)
  353. }
  354. for range epc.procs {
  355. if err := <-readyC; err != nil {
  356. epc.Close()
  357. return err
  358. }
  359. }
  360. return nil
  361. }
  362. func (epc *etcdProcessCluster) RestartAll() error {
  363. for i := range epc.procs {
  364. proc, err := newEtcdProcess(epc.procs[i].cfg)
  365. if err != nil {
  366. epc.Close()
  367. return err
  368. }
  369. epc.procs[i] = proc
  370. }
  371. return epc.Start()
  372. }
  373. func (epc *etcdProcessCluster) StopAll() (err error) {
  374. for _, p := range epc.procs {
  375. if p == nil {
  376. continue
  377. }
  378. if curErr := p.Stop(); curErr != nil {
  379. if err != nil {
  380. err = fmt.Errorf("%v; %v", err, curErr)
  381. } else {
  382. err = curErr
  383. }
  384. }
  385. }
  386. return err
  387. }
  388. func (epc *etcdProcessCluster) Close() error {
  389. err := epc.StopAll()
  390. for _, p := range epc.procs {
  391. // p is nil when newEtcdProcess fails in the middle
  392. // Close still gets called to clean up test data
  393. if p == nil {
  394. continue
  395. }
  396. os.RemoveAll(p.cfg.dataDirPath)
  397. }
  398. return err
  399. }
  400. func (ep *etcdProcess) Restart() error {
  401. newEp, err := newEtcdProcess(ep.cfg)
  402. if err != nil {
  403. ep.Stop()
  404. return err
  405. }
  406. *ep = *newEp
  407. if err = ep.waitReady(); err != nil {
  408. ep.Stop()
  409. return err
  410. }
  411. return nil
  412. }
  413. func (ep *etcdProcess) Stop() error {
  414. if ep == nil {
  415. return nil
  416. }
  417. if err := ep.proc.Stop(); err != nil {
  418. return err
  419. }
  420. <-ep.donec
  421. if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
  422. os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path)
  423. }
  424. return nil
  425. }
  426. func (ep *etcdProcess) waitReady() error {
  427. defer close(ep.donec)
  428. return waitReadyExpectProc(ep.proc, ep.cfg.isProxy)
  429. }
  430. func waitReadyExpectProc(exproc *expect.ExpectProcess, isProxy bool) error {
  431. readyStrs := []string{"enabled capabilities for version", "published"}
  432. if isProxy {
  433. readyStrs = []string{"httpproxy: endpoints found"}
  434. }
  435. c := 0
  436. matchSet := func(l string) bool {
  437. for _, s := range readyStrs {
  438. if strings.Contains(l, s) {
  439. c++
  440. break
  441. }
  442. }
  443. return c == len(readyStrs)
  444. }
  445. _, err := exproc.ExpectFunc(matchSet)
  446. return err
  447. }
  448. func spawnWithExpect(args []string, expected string) error {
  449. return spawnWithExpects(args, []string{expected}...)
  450. }
  451. func spawnWithExpects(args []string, xs ...string) error {
  452. proc, err := spawnCmd(args)
  453. if err != nil {
  454. return err
  455. }
  456. // process until either stdout or stderr contains
  457. // the expected string
  458. var (
  459. lines []string
  460. lineFunc = func(txt string) bool { return true }
  461. )
  462. for _, txt := range xs {
  463. for {
  464. l, lerr := proc.ExpectFunc(lineFunc)
  465. if lerr != nil {
  466. proc.Close()
  467. return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
  468. }
  469. lines = append(lines, l)
  470. if strings.Contains(l, txt) {
  471. break
  472. }
  473. }
  474. }
  475. perr := proc.Close()
  476. if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output
  477. return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
  478. }
  479. return perr
  480. }
  481. // proxies returns only the proxy etcdProcess.
  482. func (epc *etcdProcessCluster) proxies() []*etcdProcess {
  483. return epc.procs[epc.cfg.clusterSize:]
  484. }
  485. func (epc *etcdProcessCluster) processes() []*etcdProcess {
  486. return epc.procs[:epc.cfg.clusterSize]
  487. }
  488. func (epc *etcdProcessCluster) endpoints() []string {
  489. eps := make([]string, epc.cfg.clusterSize)
  490. for i, ep := range epc.processes() {
  491. eps[i] = ep.cfg.acurl
  492. }
  493. return eps
  494. }
  495. func (epc *etcdProcessCluster) grpcEndpoints() []string {
  496. eps := make([]string, epc.cfg.clusterSize)
  497. for i, ep := range epc.processes() {
  498. eps[i] = ep.cfg.acurlHost
  499. }
  500. return eps
  501. }
  502. func (epc *etcdProcessCluster) withStopSignal(sig os.Signal) os.Signal {
  503. ret := epc.procs[0].proc.StopSignal
  504. for _, p := range epc.procs {
  505. p.proc.StopSignal = sig
  506. }
  507. return ret
  508. }
  509. func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error {
  510. errc := make(chan error, 1)
  511. go func() { errc <- p.Close() }()
  512. select {
  513. case err := <-errc:
  514. return err
  515. case <-time.After(d):
  516. p.Stop()
  517. // retry close after stopping to collect SIGQUIT data, if any
  518. closeWithTimeout(p, time.Second)
  519. }
  520. return fmt.Errorf("took longer than %v to Close process %+v", d, p)
  521. }