etcd_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "time"
  22. "github.com/coreos/etcd/etcdserver"
  23. "github.com/coreos/etcd/pkg/expect"
  24. "github.com/coreos/etcd/pkg/fileutil"
  25. )
  26. const etcdProcessBasePort = 20000
  27. var (
  28. binPath string
  29. ctlBinPath string
  30. certPath string
  31. privateKeyPath string
  32. caPath string
  33. )
  34. type clientConnType int
  35. const (
  36. clientNonTLS clientConnType = iota
  37. clientTLS
  38. clientTLSAndNonTLS
  39. )
  40. var (
  41. configNoTLS = etcdProcessClusterConfig{
  42. clusterSize: 3,
  43. proxySize: 0,
  44. initialToken: "new",
  45. }
  46. configAutoTLS = etcdProcessClusterConfig{
  47. clusterSize: 3,
  48. isPeerTLS: true,
  49. isPeerAutoTLS: true,
  50. initialToken: "new",
  51. }
  52. configTLS = etcdProcessClusterConfig{
  53. clusterSize: 3,
  54. proxySize: 0,
  55. clientTLS: clientTLS,
  56. isPeerTLS: true,
  57. initialToken: "new",
  58. }
  59. configClientTLS = etcdProcessClusterConfig{
  60. clusterSize: 3,
  61. proxySize: 0,
  62. clientTLS: clientTLS,
  63. initialToken: "new",
  64. }
  65. configClientBoth = etcdProcessClusterConfig{
  66. clusterSize: 1,
  67. proxySize: 0,
  68. clientTLS: clientTLSAndNonTLS,
  69. initialToken: "new",
  70. }
  71. configClientAutoTLS = etcdProcessClusterConfig{
  72. clusterSize: 1,
  73. proxySize: 0,
  74. isClientAutoTLS: true,
  75. clientTLS: clientTLS,
  76. initialToken: "new",
  77. }
  78. configPeerTLS = etcdProcessClusterConfig{
  79. clusterSize: 3,
  80. proxySize: 0,
  81. isPeerTLS: true,
  82. initialToken: "new",
  83. }
  84. configWithProxy = etcdProcessClusterConfig{
  85. clusterSize: 3,
  86. proxySize: 1,
  87. initialToken: "new",
  88. }
  89. configWithProxyTLS = etcdProcessClusterConfig{
  90. clusterSize: 3,
  91. proxySize: 1,
  92. clientTLS: clientTLS,
  93. isPeerTLS: true,
  94. initialToken: "new",
  95. }
  96. configWithProxyPeerTLS = etcdProcessClusterConfig{
  97. clusterSize: 3,
  98. proxySize: 1,
  99. isPeerTLS: true,
  100. initialToken: "new",
  101. }
  102. configClientTLSCertAuth = etcdProcessClusterConfig{
  103. clusterSize: 1,
  104. proxySize: 0,
  105. clientTLS: clientTLS,
  106. initialToken: "new",
  107. clientCertAuthEnabled: true,
  108. }
  109. )
  110. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  111. ret := cfg
  112. ret.clusterSize = 1
  113. return &ret
  114. }
  115. type etcdProcessCluster struct {
  116. cfg *etcdProcessClusterConfig
  117. procs []*etcdProcess
  118. }
  119. type etcdProcess struct {
  120. cfg *etcdProcessConfig
  121. proc *expect.ExpectProcess
  122. donec chan struct{} // closed when Interact() terminates
  123. }
  124. type etcdProcessConfig struct {
  125. execPath string
  126. args []string
  127. dataDirPath string
  128. keepDataDir bool
  129. name string
  130. purl url.URL
  131. acurl string
  132. // additional url for tls connection when the etcd process
  133. // serves both http and https
  134. acurltls string
  135. acurlHost string
  136. initialToken string
  137. initialCluster string
  138. isProxy bool
  139. }
  140. type etcdProcessClusterConfig struct {
  141. execPath string
  142. dataDirPath string
  143. keepDataDir bool
  144. clusterSize int
  145. baseScheme string
  146. basePort int
  147. proxySize int
  148. snapCount int // default is 10000
  149. clientTLS clientConnType
  150. clientCertAuthEnabled bool
  151. isPeerTLS bool
  152. isPeerAutoTLS bool
  153. isClientAutoTLS bool
  154. forceNewCluster bool
  155. initialToken string
  156. quotaBackendBytes int64
  157. noStrictReconfig bool
  158. }
  159. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  160. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  161. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  162. etcdCfgs := cfg.etcdProcessConfigs()
  163. epc := &etcdProcessCluster{
  164. cfg: cfg,
  165. procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
  166. }
  167. // launch etcd processes
  168. for i := range etcdCfgs {
  169. proc, err := newEtcdProcess(etcdCfgs[i])
  170. if err != nil {
  171. epc.Close()
  172. return nil, err
  173. }
  174. epc.procs[i] = proc
  175. }
  176. return epc, epc.Start()
  177. }
  178. func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
  179. if !fileutil.Exist(cfg.execPath) {
  180. return nil, fmt.Errorf("could not find etcd binary")
  181. }
  182. if !cfg.keepDataDir {
  183. if err := os.RemoveAll(cfg.dataDirPath); err != nil {
  184. return nil, err
  185. }
  186. }
  187. child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...))
  188. if err != nil {
  189. return nil, err
  190. }
  191. return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
  192. }
  193. func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
  194. binPath = binDir + "/etcd"
  195. ctlBinPath = binDir + "/etcdctl"
  196. certPath = certDir + "/server.crt"
  197. privateKeyPath = certDir + "/server.key.insecure"
  198. caPath = certDir + "/ca.crt"
  199. if cfg.basePort == 0 {
  200. cfg.basePort = etcdProcessBasePort
  201. }
  202. if cfg.execPath == "" {
  203. cfg.execPath = binPath
  204. }
  205. if cfg.snapCount == 0 {
  206. cfg.snapCount = etcdserver.DefaultSnapCount
  207. }
  208. clientScheme := "http"
  209. if cfg.clientTLS == clientTLS {
  210. clientScheme = "https"
  211. }
  212. peerScheme := cfg.baseScheme
  213. if peerScheme == "" {
  214. peerScheme = "http"
  215. }
  216. if cfg.isPeerTLS {
  217. peerScheme += "s"
  218. }
  219. etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
  220. initialCluster := make([]string, cfg.clusterSize)
  221. for i := 0; i < cfg.clusterSize; i++ {
  222. var curls []string
  223. var curl, curltls string
  224. port := cfg.basePort + 2*i
  225. curlHost := fmt.Sprintf("localhost:%d", port)
  226. switch cfg.clientTLS {
  227. case clientNonTLS, clientTLS:
  228. curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
  229. curls = []string{curl}
  230. case clientTLSAndNonTLS:
  231. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  232. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  233. curls = []string{curl, curltls}
  234. }
  235. purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
  236. name := fmt.Sprintf("testname%d", i)
  237. dataDirPath := cfg.dataDirPath
  238. if cfg.dataDirPath == "" {
  239. var derr error
  240. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  241. if derr != nil {
  242. panic("could not get tempdir for datadir")
  243. }
  244. }
  245. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  246. args := []string{
  247. "--name", name,
  248. "--listen-client-urls", strings.Join(curls, ","),
  249. "--advertise-client-urls", strings.Join(curls, ","),
  250. "--listen-peer-urls", purl.String(),
  251. "--initial-advertise-peer-urls", purl.String(),
  252. "--initial-cluster-token", cfg.initialToken,
  253. "--data-dir", dataDirPath,
  254. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  255. }
  256. if cfg.forceNewCluster {
  257. args = append(args, "--force-new-cluster")
  258. }
  259. if cfg.quotaBackendBytes > 0 {
  260. args = append(args,
  261. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  262. )
  263. }
  264. if cfg.noStrictReconfig {
  265. args = append(args, "--strict-reconfig-check=false")
  266. }
  267. args = append(args, cfg.tlsArgs()...)
  268. etcdCfgs[i] = &etcdProcessConfig{
  269. execPath: cfg.execPath,
  270. args: args,
  271. dataDirPath: dataDirPath,
  272. keepDataDir: cfg.keepDataDir,
  273. name: name,
  274. purl: purl,
  275. acurl: curl,
  276. acurltls: curltls,
  277. acurlHost: curlHost,
  278. initialToken: cfg.initialToken,
  279. }
  280. }
  281. for i := 0; i < cfg.proxySize; i++ {
  282. port := cfg.basePort + 2*cfg.clusterSize + i + 1
  283. curlHost := fmt.Sprintf("localhost:%d", port)
  284. curl := url.URL{Scheme: clientScheme, Host: curlHost}
  285. name := fmt.Sprintf("testname-proxy%d", i)
  286. dataDirPath, derr := ioutil.TempDir("", name+".etcd")
  287. if derr != nil {
  288. panic("could not get tempdir for datadir")
  289. }
  290. args := []string{
  291. "--name", name,
  292. "--proxy", "on",
  293. "--listen-client-urls", curl.String(),
  294. "--data-dir", dataDirPath,
  295. }
  296. args = append(args, cfg.tlsArgs()...)
  297. etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
  298. execPath: cfg.execPath,
  299. args: args,
  300. dataDirPath: dataDirPath,
  301. keepDataDir: cfg.keepDataDir,
  302. name: name,
  303. acurl: curl.String(),
  304. acurlHost: curlHost,
  305. isProxy: true,
  306. }
  307. }
  308. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  309. for i := range etcdCfgs {
  310. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  311. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  312. }
  313. return etcdCfgs
  314. }
  315. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  316. if cfg.clientTLS != clientNonTLS {
  317. if cfg.isClientAutoTLS {
  318. args = append(args, "--auto-tls=true")
  319. } else {
  320. tlsClientArgs := []string{
  321. "--cert-file", certPath,
  322. "--key-file", privateKeyPath,
  323. "--ca-file", caPath,
  324. }
  325. args = append(args, tlsClientArgs...)
  326. if cfg.clientCertAuthEnabled {
  327. args = append(args, "--client-cert-auth")
  328. }
  329. }
  330. }
  331. if cfg.isPeerTLS {
  332. if cfg.isPeerAutoTLS {
  333. args = append(args, "--peer-auto-tls=true")
  334. } else {
  335. tlsPeerArgs := []string{
  336. "--peer-cert-file", certPath,
  337. "--peer-key-file", privateKeyPath,
  338. "--peer-ca-file", caPath,
  339. }
  340. args = append(args, tlsPeerArgs...)
  341. }
  342. }
  343. return args
  344. }
  345. func (epc *etcdProcessCluster) Start() (err error) {
  346. readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
  347. for i := range epc.procs {
  348. go func(n int) { readyC <- epc.procs[n].waitReady() }(i)
  349. }
  350. for range epc.procs {
  351. if err := <-readyC; err != nil {
  352. epc.Close()
  353. return err
  354. }
  355. }
  356. return nil
  357. }
  358. func (epc *etcdProcessCluster) RestartAll() error {
  359. for i := range epc.procs {
  360. proc, err := newEtcdProcess(epc.procs[i].cfg)
  361. if err != nil {
  362. epc.Close()
  363. return err
  364. }
  365. epc.procs[i] = proc
  366. }
  367. return epc.Start()
  368. }
  369. func (epc *etcdProcessCluster) StopAll() (err error) {
  370. for _, p := range epc.procs {
  371. if p == nil {
  372. continue
  373. }
  374. if curErr := p.Stop(); curErr != nil {
  375. if err != nil {
  376. err = fmt.Errorf("%v; %v", err, curErr)
  377. } else {
  378. err = curErr
  379. }
  380. }
  381. }
  382. return err
  383. }
  384. func (epc *etcdProcessCluster) Close() error {
  385. err := epc.StopAll()
  386. for _, p := range epc.procs {
  387. // p is nil when newEtcdProcess fails in the middle
  388. // Close still gets called to clean up test data
  389. if p == nil {
  390. continue
  391. }
  392. os.RemoveAll(p.cfg.dataDirPath)
  393. }
  394. return err
  395. }
  396. func (ep *etcdProcess) Restart() error {
  397. newEp, err := newEtcdProcess(ep.cfg)
  398. if err != nil {
  399. ep.Stop()
  400. return err
  401. }
  402. *ep = *newEp
  403. if err = ep.waitReady(); err != nil {
  404. ep.Stop()
  405. return err
  406. }
  407. return nil
  408. }
  409. func (ep *etcdProcess) Stop() error {
  410. if ep == nil {
  411. return nil
  412. }
  413. if err := ep.proc.Stop(); err != nil {
  414. return err
  415. }
  416. <-ep.donec
  417. if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
  418. os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path)
  419. }
  420. return nil
  421. }
  422. func (ep *etcdProcess) waitReady() error {
  423. defer close(ep.donec)
  424. return waitReadyExpectProc(ep.proc, ep.cfg.isProxy)
  425. }
  426. func waitReadyExpectProc(exproc *expect.ExpectProcess, isProxy bool) error {
  427. readyStrs := []string{"enabled capabilities for version", "published"}
  428. if isProxy {
  429. readyStrs = []string{"httpproxy: endpoints found"}
  430. }
  431. c := 0
  432. matchSet := func(l string) bool {
  433. for _, s := range readyStrs {
  434. if strings.Contains(l, s) {
  435. c++
  436. break
  437. }
  438. }
  439. return c == len(readyStrs)
  440. }
  441. _, err := exproc.ExpectFunc(matchSet)
  442. return err
  443. }
  444. func spawnWithExpect(args []string, expected string) error {
  445. return spawnWithExpects(args, []string{expected}...)
  446. }
  447. func spawnWithExpects(args []string, xs ...string) error {
  448. proc, err := spawnCmd(args)
  449. if err != nil {
  450. return err
  451. }
  452. // process until either stdout or stderr contains
  453. // the expected string
  454. var (
  455. lines []string
  456. lineFunc = func(txt string) bool { return true }
  457. )
  458. for _, txt := range xs {
  459. for {
  460. l, lerr := proc.ExpectFunc(lineFunc)
  461. if lerr != nil {
  462. proc.Close()
  463. return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
  464. }
  465. lines = append(lines, l)
  466. if strings.Contains(l, txt) {
  467. break
  468. }
  469. }
  470. }
  471. perr := proc.Close()
  472. if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output
  473. return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
  474. }
  475. return perr
  476. }
  477. // proxies returns only the proxy etcdProcess.
  478. func (epc *etcdProcessCluster) proxies() []*etcdProcess {
  479. return epc.procs[epc.cfg.clusterSize:]
  480. }
  481. func (epc *etcdProcessCluster) processes() []*etcdProcess {
  482. return epc.procs[:epc.cfg.clusterSize]
  483. }
  484. func (epc *etcdProcessCluster) endpoints() []string {
  485. eps := make([]string, epc.cfg.clusterSize)
  486. for i, ep := range epc.processes() {
  487. eps[i] = ep.cfg.acurl
  488. }
  489. return eps
  490. }
  491. func (epc *etcdProcessCluster) grpcEndpoints() []string {
  492. eps := make([]string, epc.cfg.clusterSize)
  493. for i, ep := range epc.processes() {
  494. eps[i] = ep.cfg.acurlHost
  495. }
  496. return eps
  497. }
  498. func (epc *etcdProcessCluster) withStopSignal(sig os.Signal) os.Signal {
  499. ret := epc.procs[0].proc.StopSignal
  500. for _, p := range epc.procs {
  501. p.proc.StopSignal = sig
  502. }
  503. return ret
  504. }
  505. func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error {
  506. errc := make(chan error, 1)
  507. go func() { errc <- p.Close() }()
  508. select {
  509. case err := <-errc:
  510. return err
  511. case <-time.After(d):
  512. p.Stop()
  513. // retry close after stopping to collect SIGQUIT data, if any
  514. closeWithTimeout(p, time.Second)
  515. }
  516. return fmt.Errorf("took longer than %v to Close process %+v", d, p)
  517. }