etcd_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "time"
  22. "github.com/coreos/etcd/etcdserver"
  23. "github.com/coreos/etcd/pkg/expect"
  24. "github.com/coreos/etcd/pkg/fileutil"
  25. )
  26. const etcdProcessBasePort = 20000
  27. var (
  28. binPath string
  29. ctlBinPath string
  30. certPath string
  31. privateKeyPath string
  32. caPath string
  33. crlPath string
  34. revokedCertPath string
  35. revokedPrivateKeyPath string
  36. )
  37. type clientConnType int
  38. const (
  39. clientNonTLS clientConnType = iota
  40. clientTLS
  41. clientTLSAndNonTLS
  42. )
  43. var (
  44. configNoTLS = etcdProcessClusterConfig{
  45. clusterSize: 3,
  46. proxySize: 0,
  47. initialToken: "new",
  48. }
  49. configAutoTLS = etcdProcessClusterConfig{
  50. clusterSize: 3,
  51. isPeerTLS: true,
  52. isPeerAutoTLS: true,
  53. initialToken: "new",
  54. }
  55. configTLS = etcdProcessClusterConfig{
  56. clusterSize: 3,
  57. proxySize: 0,
  58. clientTLS: clientTLS,
  59. isPeerTLS: true,
  60. initialToken: "new",
  61. }
  62. configClientTLS = etcdProcessClusterConfig{
  63. clusterSize: 3,
  64. proxySize: 0,
  65. clientTLS: clientTLS,
  66. initialToken: "new",
  67. }
  68. configClientBoth = etcdProcessClusterConfig{
  69. clusterSize: 1,
  70. proxySize: 0,
  71. clientTLS: clientTLSAndNonTLS,
  72. initialToken: "new",
  73. }
  74. configClientAutoTLS = etcdProcessClusterConfig{
  75. clusterSize: 1,
  76. proxySize: 0,
  77. isClientAutoTLS: true,
  78. clientTLS: clientTLS,
  79. initialToken: "new",
  80. }
  81. configPeerTLS = etcdProcessClusterConfig{
  82. clusterSize: 3,
  83. proxySize: 0,
  84. isPeerTLS: true,
  85. initialToken: "new",
  86. }
  87. configWithProxy = etcdProcessClusterConfig{
  88. clusterSize: 3,
  89. proxySize: 1,
  90. initialToken: "new",
  91. }
  92. configWithProxyTLS = etcdProcessClusterConfig{
  93. clusterSize: 3,
  94. proxySize: 1,
  95. clientTLS: clientTLS,
  96. isPeerTLS: true,
  97. initialToken: "new",
  98. }
  99. configWithProxyPeerTLS = etcdProcessClusterConfig{
  100. clusterSize: 3,
  101. proxySize: 1,
  102. isPeerTLS: true,
  103. initialToken: "new",
  104. }
  105. configClientTLSCertAuth = etcdProcessClusterConfig{
  106. clusterSize: 1,
  107. proxySize: 0,
  108. clientTLS: clientTLS,
  109. initialToken: "new",
  110. clientCertAuthEnabled: true,
  111. }
  112. )
  113. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  114. ret := cfg
  115. ret.clusterSize = 1
  116. return &ret
  117. }
  118. type etcdProcessCluster struct {
  119. cfg *etcdProcessClusterConfig
  120. procs []*etcdProcess
  121. }
  122. type etcdProcess struct {
  123. cfg *etcdProcessConfig
  124. proc *expect.ExpectProcess
  125. donec chan struct{} // closed when Interact() terminates
  126. }
  127. type etcdProcessConfig struct {
  128. execPath string
  129. args []string
  130. dataDirPath string
  131. keepDataDir bool
  132. name string
  133. purl url.URL
  134. acurl string
  135. // additional url for tls connection when the etcd process
  136. // serves both http and https
  137. acurltls string
  138. acurlHost string
  139. initialToken string
  140. initialCluster string
  141. isProxy bool
  142. }
  143. type etcdProcessClusterConfig struct {
  144. execPath string
  145. dataDirPath string
  146. keepDataDir bool
  147. clusterSize int
  148. baseScheme string
  149. basePort int
  150. proxySize int
  151. snapCount int // default is 10000
  152. clientTLS clientConnType
  153. clientCertAuthEnabled bool
  154. isPeerTLS bool
  155. isPeerAutoTLS bool
  156. isClientAutoTLS bool
  157. isClientCRL bool
  158. forceNewCluster bool
  159. initialToken string
  160. quotaBackendBytes int64
  161. noStrictReconfig bool
  162. }
  163. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  164. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  165. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  166. etcdCfgs := cfg.etcdProcessConfigs()
  167. epc := &etcdProcessCluster{
  168. cfg: cfg,
  169. procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
  170. }
  171. // launch etcd processes
  172. for i := range etcdCfgs {
  173. proc, err := newEtcdProcess(etcdCfgs[i])
  174. if err != nil {
  175. epc.Close()
  176. return nil, err
  177. }
  178. epc.procs[i] = proc
  179. }
  180. return epc, epc.Start()
  181. }
  182. func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
  183. if !fileutil.Exist(cfg.execPath) {
  184. return nil, fmt.Errorf("could not find etcd binary")
  185. }
  186. if !cfg.keepDataDir {
  187. if err := os.RemoveAll(cfg.dataDirPath); err != nil {
  188. return nil, err
  189. }
  190. }
  191. child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...))
  192. if err != nil {
  193. return nil, err
  194. }
  195. return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
  196. }
  197. func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
  198. binPath = binDir + "/etcd"
  199. ctlBinPath = binDir + "/etcdctl"
  200. certPath = certDir + "/server.crt"
  201. privateKeyPath = certDir + "/server.key.insecure"
  202. caPath = certDir + "/ca.crt"
  203. revokedCertPath = certDir + "/server-revoked.crt"
  204. revokedPrivateKeyPath = certDir + "/server-revoked.key.insecure"
  205. crlPath = certDir + "/revoke.crl"
  206. if cfg.basePort == 0 {
  207. cfg.basePort = etcdProcessBasePort
  208. }
  209. if cfg.execPath == "" {
  210. cfg.execPath = binPath
  211. }
  212. if cfg.snapCount == 0 {
  213. cfg.snapCount = etcdserver.DefaultSnapCount
  214. }
  215. clientScheme := "http"
  216. if cfg.clientTLS == clientTLS {
  217. clientScheme = "https"
  218. }
  219. peerScheme := cfg.baseScheme
  220. if peerScheme == "" {
  221. peerScheme = "http"
  222. }
  223. if cfg.isPeerTLS {
  224. peerScheme += "s"
  225. }
  226. etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
  227. initialCluster := make([]string, cfg.clusterSize)
  228. for i := 0; i < cfg.clusterSize; i++ {
  229. var curls []string
  230. var curl, curltls string
  231. port := cfg.basePort + 2*i
  232. curlHost := fmt.Sprintf("localhost:%d", port)
  233. switch cfg.clientTLS {
  234. case clientNonTLS, clientTLS:
  235. curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
  236. curls = []string{curl}
  237. case clientTLSAndNonTLS:
  238. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  239. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  240. curls = []string{curl, curltls}
  241. }
  242. purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
  243. name := fmt.Sprintf("testname%d", i)
  244. dataDirPath := cfg.dataDirPath
  245. if cfg.dataDirPath == "" {
  246. var derr error
  247. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  248. if derr != nil {
  249. panic("could not get tempdir for datadir")
  250. }
  251. }
  252. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  253. args := []string{
  254. "--name", name,
  255. "--listen-client-urls", strings.Join(curls, ","),
  256. "--advertise-client-urls", strings.Join(curls, ","),
  257. "--listen-peer-urls", purl.String(),
  258. "--initial-advertise-peer-urls", purl.String(),
  259. "--initial-cluster-token", cfg.initialToken,
  260. "--data-dir", dataDirPath,
  261. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  262. }
  263. if cfg.forceNewCluster {
  264. args = append(args, "--force-new-cluster")
  265. }
  266. if cfg.quotaBackendBytes > 0 {
  267. args = append(args,
  268. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  269. )
  270. }
  271. if cfg.noStrictReconfig {
  272. args = append(args, "--strict-reconfig-check=false")
  273. }
  274. args = append(args, cfg.tlsArgs()...)
  275. etcdCfgs[i] = &etcdProcessConfig{
  276. execPath: cfg.execPath,
  277. args: args,
  278. dataDirPath: dataDirPath,
  279. keepDataDir: cfg.keepDataDir,
  280. name: name,
  281. purl: purl,
  282. acurl: curl,
  283. acurltls: curltls,
  284. acurlHost: curlHost,
  285. initialToken: cfg.initialToken,
  286. }
  287. }
  288. for i := 0; i < cfg.proxySize; i++ {
  289. port := cfg.basePort + 2*cfg.clusterSize + i + 1
  290. curlHost := fmt.Sprintf("localhost:%d", port)
  291. curl := url.URL{Scheme: clientScheme, Host: curlHost}
  292. name := fmt.Sprintf("testname-proxy%d", i)
  293. dataDirPath, derr := ioutil.TempDir("", name+".etcd")
  294. if derr != nil {
  295. panic("could not get tempdir for datadir")
  296. }
  297. args := []string{
  298. "--name", name,
  299. "--proxy", "on",
  300. "--listen-client-urls", curl.String(),
  301. "--data-dir", dataDirPath,
  302. }
  303. args = append(args, cfg.tlsArgs()...)
  304. etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
  305. execPath: cfg.execPath,
  306. args: args,
  307. dataDirPath: dataDirPath,
  308. keepDataDir: cfg.keepDataDir,
  309. name: name,
  310. acurl: curl.String(),
  311. acurlHost: curlHost,
  312. isProxy: true,
  313. }
  314. }
  315. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  316. for i := range etcdCfgs {
  317. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  318. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  319. }
  320. return etcdCfgs
  321. }
  322. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  323. if cfg.clientTLS != clientNonTLS {
  324. if cfg.isClientAutoTLS {
  325. args = append(args, "--auto-tls=true")
  326. } else {
  327. tlsClientArgs := []string{
  328. "--cert-file", certPath,
  329. "--key-file", privateKeyPath,
  330. "--ca-file", caPath,
  331. }
  332. args = append(args, tlsClientArgs...)
  333. if cfg.clientCertAuthEnabled {
  334. args = append(args, "--client-cert-auth")
  335. }
  336. }
  337. }
  338. if cfg.isPeerTLS {
  339. if cfg.isPeerAutoTLS {
  340. args = append(args, "--peer-auto-tls=true")
  341. } else {
  342. tlsPeerArgs := []string{
  343. "--peer-cert-file", certPath,
  344. "--peer-key-file", privateKeyPath,
  345. "--peer-ca-file", caPath,
  346. }
  347. args = append(args, tlsPeerArgs...)
  348. }
  349. }
  350. if cfg.isClientCRL {
  351. args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
  352. }
  353. return args
  354. }
  355. func (epc *etcdProcessCluster) Start() (err error) {
  356. readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
  357. for i := range epc.procs {
  358. go func(n int) { readyC <- epc.procs[n].waitReady() }(i)
  359. }
  360. for range epc.procs {
  361. if err := <-readyC; err != nil {
  362. epc.Close()
  363. return err
  364. }
  365. }
  366. return nil
  367. }
  368. func (epc *etcdProcessCluster) RestartAll() error {
  369. for i := range epc.procs {
  370. proc, err := newEtcdProcess(epc.procs[i].cfg)
  371. if err != nil {
  372. epc.Close()
  373. return err
  374. }
  375. epc.procs[i] = proc
  376. }
  377. return epc.Start()
  378. }
  379. func (epc *etcdProcessCluster) StopAll() (err error) {
  380. for _, p := range epc.procs {
  381. if p == nil {
  382. continue
  383. }
  384. if curErr := p.Stop(); curErr != nil {
  385. if err != nil {
  386. err = fmt.Errorf("%v; %v", err, curErr)
  387. } else {
  388. err = curErr
  389. }
  390. }
  391. }
  392. return err
  393. }
  394. func (epc *etcdProcessCluster) Close() error {
  395. err := epc.StopAll()
  396. for _, p := range epc.procs {
  397. // p is nil when newEtcdProcess fails in the middle
  398. // Close still gets called to clean up test data
  399. if p == nil {
  400. continue
  401. }
  402. os.RemoveAll(p.cfg.dataDirPath)
  403. }
  404. return err
  405. }
  406. func (ep *etcdProcess) Restart() error {
  407. newEp, err := newEtcdProcess(ep.cfg)
  408. if err != nil {
  409. ep.Stop()
  410. return err
  411. }
  412. *ep = *newEp
  413. if err = ep.waitReady(); err != nil {
  414. ep.Stop()
  415. return err
  416. }
  417. return nil
  418. }
  419. func (ep *etcdProcess) Stop() error {
  420. if ep == nil {
  421. return nil
  422. }
  423. if err := ep.proc.Stop(); err != nil {
  424. return err
  425. }
  426. <-ep.donec
  427. if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
  428. os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path)
  429. }
  430. return nil
  431. }
  432. func (ep *etcdProcess) waitReady() error {
  433. defer close(ep.donec)
  434. return waitReadyExpectProc(ep.proc, ep.cfg.isProxy)
  435. }
  436. func waitReadyExpectProc(exproc *expect.ExpectProcess, isProxy bool) error {
  437. readyStrs := []string{"enabled capabilities for version", "published"}
  438. if isProxy {
  439. readyStrs = []string{"httpproxy: endpoints found"}
  440. }
  441. c := 0
  442. matchSet := func(l string) bool {
  443. for _, s := range readyStrs {
  444. if strings.Contains(l, s) {
  445. c++
  446. break
  447. }
  448. }
  449. return c == len(readyStrs)
  450. }
  451. _, err := exproc.ExpectFunc(matchSet)
  452. return err
  453. }
  454. func spawnWithExpect(args []string, expected string) error {
  455. return spawnWithExpects(args, []string{expected}...)
  456. }
  457. func spawnWithExpects(args []string, xs ...string) error {
  458. proc, err := spawnCmd(args)
  459. if err != nil {
  460. return err
  461. }
  462. // process until either stdout or stderr contains
  463. // the expected string
  464. var (
  465. lines []string
  466. lineFunc = func(txt string) bool { return true }
  467. )
  468. for _, txt := range xs {
  469. for {
  470. l, lerr := proc.ExpectFunc(lineFunc)
  471. if lerr != nil {
  472. proc.Close()
  473. return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
  474. }
  475. lines = append(lines, l)
  476. if strings.Contains(l, txt) {
  477. break
  478. }
  479. }
  480. }
  481. perr := proc.Close()
  482. if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output
  483. return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
  484. }
  485. return perr
  486. }
  487. // proxies returns only the proxy etcdProcess.
  488. func (epc *etcdProcessCluster) proxies() []*etcdProcess {
  489. return epc.procs[epc.cfg.clusterSize:]
  490. }
  491. func (epc *etcdProcessCluster) processes() []*etcdProcess {
  492. return epc.procs[:epc.cfg.clusterSize]
  493. }
  494. func (epc *etcdProcessCluster) endpoints() []string {
  495. eps := make([]string, epc.cfg.clusterSize)
  496. for i, ep := range epc.processes() {
  497. eps[i] = ep.cfg.acurl
  498. }
  499. return eps
  500. }
  501. func (epc *etcdProcessCluster) grpcEndpoints() []string {
  502. eps := make([]string, epc.cfg.clusterSize)
  503. for i, ep := range epc.processes() {
  504. eps[i] = ep.cfg.acurlHost
  505. }
  506. return eps
  507. }
  508. func (epc *etcdProcessCluster) withStopSignal(sig os.Signal) os.Signal {
  509. ret := epc.procs[0].proc.StopSignal
  510. for _, p := range epc.procs {
  511. p.proc.StopSignal = sig
  512. }
  513. return ret
  514. }
  515. func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error {
  516. errc := make(chan error, 1)
  517. go func() { errc <- p.Close() }()
  518. select {
  519. case err := <-errc:
  520. return err
  521. case <-time.After(d):
  522. p.Stop()
  523. // retry close after stopping to collect SIGQUIT data, if any
  524. closeWithTimeout(p, time.Second)
  525. }
  526. return fmt.Errorf("took longer than %v to Close process %+v", d, p)
  527. }