etcd_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/etcdserver"
  22. "github.com/coreos/etcd/pkg/expect"
  23. "github.com/coreos/etcd/pkg/fileutil"
  24. )
  25. const etcdProcessBasePort = 20000
  26. var (
  27. binPath string
  28. ctlBinPath string
  29. certPath string
  30. privateKeyPath string
  31. caPath string
  32. )
  33. type clientConnType int
  34. const (
  35. clientNonTLS clientConnType = iota
  36. clientTLS
  37. clientTLSAndNonTLS
  38. )
  39. var (
  40. configNoTLS = etcdProcessClusterConfig{
  41. clusterSize: 3,
  42. proxySize: 0,
  43. initialToken: "new",
  44. }
  45. configAutoTLS = etcdProcessClusterConfig{
  46. clusterSize: 3,
  47. isPeerTLS: true,
  48. isPeerAutoTLS: true,
  49. initialToken: "new",
  50. }
  51. configTLS = etcdProcessClusterConfig{
  52. clusterSize: 3,
  53. proxySize: 0,
  54. clientTLS: clientTLS,
  55. isPeerTLS: true,
  56. initialToken: "new",
  57. }
  58. configClientTLS = etcdProcessClusterConfig{
  59. clusterSize: 3,
  60. proxySize: 0,
  61. clientTLS: clientTLS,
  62. initialToken: "new",
  63. }
  64. configClientBoth = etcdProcessClusterConfig{
  65. clusterSize: 1,
  66. proxySize: 0,
  67. clientTLS: clientTLSAndNonTLS,
  68. initialToken: "new",
  69. }
  70. configClientAutoTLS = etcdProcessClusterConfig{
  71. clusterSize: 1,
  72. proxySize: 0,
  73. isClientAutoTLS: true,
  74. clientTLS: clientTLS,
  75. initialToken: "new",
  76. }
  77. configPeerTLS = etcdProcessClusterConfig{
  78. clusterSize: 3,
  79. proxySize: 0,
  80. isPeerTLS: true,
  81. initialToken: "new",
  82. }
  83. configWithProxy = etcdProcessClusterConfig{
  84. clusterSize: 3,
  85. proxySize: 1,
  86. initialToken: "new",
  87. }
  88. configWithProxyTLS = etcdProcessClusterConfig{
  89. clusterSize: 3,
  90. proxySize: 1,
  91. clientTLS: clientTLS,
  92. isPeerTLS: true,
  93. initialToken: "new",
  94. }
  95. configWithProxyPeerTLS = etcdProcessClusterConfig{
  96. clusterSize: 3,
  97. proxySize: 1,
  98. isPeerTLS: true,
  99. initialToken: "new",
  100. }
  101. )
  102. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  103. ret := cfg
  104. ret.clusterSize = 1
  105. return &ret
  106. }
  107. type etcdProcessCluster struct {
  108. cfg *etcdProcessClusterConfig
  109. procs []*etcdProcess
  110. }
  111. type etcdProcess struct {
  112. cfg *etcdProcessConfig
  113. proc *expect.ExpectProcess
  114. donec chan struct{} // closed when Interact() terminates
  115. }
  116. type etcdProcessConfig struct {
  117. execPath string
  118. args []string
  119. dataDirPath string
  120. keepDataDir bool
  121. name string
  122. purl url.URL
  123. acurl string
  124. // additional url for tls connection when the etcd process
  125. // serves both http and https
  126. acurltls string
  127. acurlHost string
  128. initialToken string
  129. initialCluster string
  130. isProxy bool
  131. }
  132. type etcdProcessClusterConfig struct {
  133. execPath string
  134. dataDirPath string
  135. keepDataDir bool
  136. clusterSize int
  137. baseScheme string
  138. basePort int
  139. proxySize int
  140. snapCount int // default is 10000
  141. clientTLS clientConnType
  142. clientCertAuthEnabled bool
  143. isPeerTLS bool
  144. isPeerAutoTLS bool
  145. isClientAutoTLS bool
  146. forceNewCluster bool
  147. initialToken string
  148. quotaBackendBytes int64
  149. noStrictReconfig bool
  150. }
  151. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  152. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  153. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  154. etcdCfgs := cfg.etcdProcessConfigs()
  155. epc := &etcdProcessCluster{
  156. cfg: cfg,
  157. procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
  158. }
  159. // launch etcd processes
  160. for i := range etcdCfgs {
  161. proc, err := newEtcdProcess(etcdCfgs[i])
  162. if err != nil {
  163. epc.Close()
  164. return nil, err
  165. }
  166. epc.procs[i] = proc
  167. }
  168. return epc, epc.Start()
  169. }
  170. func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
  171. if !fileutil.Exist(cfg.execPath) {
  172. return nil, fmt.Errorf("could not find etcd binary")
  173. }
  174. if !cfg.keepDataDir {
  175. if err := os.RemoveAll(cfg.dataDirPath); err != nil {
  176. return nil, err
  177. }
  178. }
  179. child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...))
  180. if err != nil {
  181. return nil, err
  182. }
  183. return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
  184. }
  185. func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
  186. binPath = binDir + "/etcd"
  187. ctlBinPath = binDir + "/etcdctl"
  188. certPath = certDir + "/server.crt"
  189. privateKeyPath = certDir + "/server.key.insecure"
  190. caPath = certDir + "/ca.crt"
  191. if cfg.basePort == 0 {
  192. cfg.basePort = etcdProcessBasePort
  193. }
  194. if cfg.execPath == "" {
  195. cfg.execPath = binPath
  196. }
  197. if cfg.snapCount == 0 {
  198. cfg.snapCount = etcdserver.DefaultSnapCount
  199. }
  200. clientScheme := "http"
  201. if cfg.clientTLS == clientTLS {
  202. clientScheme = "https"
  203. }
  204. peerScheme := cfg.baseScheme
  205. if peerScheme == "" {
  206. peerScheme = "http"
  207. }
  208. if cfg.isPeerTLS {
  209. peerScheme += "s"
  210. }
  211. etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
  212. initialCluster := make([]string, cfg.clusterSize)
  213. for i := 0; i < cfg.clusterSize; i++ {
  214. var curls []string
  215. var curl, curltls string
  216. port := cfg.basePort + 2*i
  217. curlHost := fmt.Sprintf("localhost:%d", port)
  218. switch cfg.clientTLS {
  219. case clientNonTLS, clientTLS:
  220. curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
  221. curls = []string{curl}
  222. case clientTLSAndNonTLS:
  223. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  224. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  225. curls = []string{curl, curltls}
  226. }
  227. purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
  228. name := fmt.Sprintf("testname%d", i)
  229. dataDirPath := cfg.dataDirPath
  230. if cfg.dataDirPath == "" {
  231. var derr error
  232. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  233. if derr != nil {
  234. panic("could not get tempdir for datadir")
  235. }
  236. }
  237. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  238. args := []string{
  239. "--name", name,
  240. "--listen-client-urls", strings.Join(curls, ","),
  241. "--advertise-client-urls", strings.Join(curls, ","),
  242. "--listen-peer-urls", purl.String(),
  243. "--initial-advertise-peer-urls", purl.String(),
  244. "--initial-cluster-token", cfg.initialToken,
  245. "--data-dir", dataDirPath,
  246. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  247. }
  248. if cfg.forceNewCluster {
  249. args = append(args, "--force-new-cluster")
  250. }
  251. if cfg.quotaBackendBytes > 0 {
  252. args = append(args,
  253. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  254. )
  255. }
  256. if cfg.noStrictReconfig {
  257. args = append(args, "--strict-reconfig-check=false")
  258. }
  259. args = append(args, cfg.tlsArgs()...)
  260. etcdCfgs[i] = &etcdProcessConfig{
  261. execPath: cfg.execPath,
  262. args: args,
  263. dataDirPath: dataDirPath,
  264. keepDataDir: cfg.keepDataDir,
  265. name: name,
  266. purl: purl,
  267. acurl: curl,
  268. acurltls: curltls,
  269. acurlHost: curlHost,
  270. initialToken: cfg.initialToken,
  271. }
  272. }
  273. for i := 0; i < cfg.proxySize; i++ {
  274. port := cfg.basePort + 2*cfg.clusterSize + i + 1
  275. curlHost := fmt.Sprintf("localhost:%d", port)
  276. curl := url.URL{Scheme: clientScheme, Host: curlHost}
  277. name := fmt.Sprintf("testname-proxy%d", i)
  278. dataDirPath, derr := ioutil.TempDir("", name+".etcd")
  279. if derr != nil {
  280. panic("could not get tempdir for datadir")
  281. }
  282. args := []string{
  283. "--name", name,
  284. "--proxy", "on",
  285. "--listen-client-urls", curl.String(),
  286. "--data-dir", dataDirPath,
  287. }
  288. args = append(args, cfg.tlsArgs()...)
  289. etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
  290. execPath: cfg.execPath,
  291. args: args,
  292. dataDirPath: dataDirPath,
  293. keepDataDir: cfg.keepDataDir,
  294. name: name,
  295. acurl: curl.String(),
  296. acurlHost: curlHost,
  297. isProxy: true,
  298. }
  299. }
  300. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  301. for i := range etcdCfgs {
  302. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  303. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  304. }
  305. return etcdCfgs
  306. }
  307. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  308. if cfg.clientTLS != clientNonTLS {
  309. if cfg.isClientAutoTLS {
  310. args = append(args, "--auto-tls=true")
  311. } else {
  312. tlsClientArgs := []string{
  313. "--cert-file", certPath,
  314. "--key-file", privateKeyPath,
  315. "--ca-file", caPath,
  316. }
  317. args = append(args, tlsClientArgs...)
  318. if cfg.clientCertAuthEnabled {
  319. args = append(args, "--client-cert-auth")
  320. }
  321. }
  322. }
  323. if cfg.isPeerTLS {
  324. if cfg.isPeerAutoTLS {
  325. args = append(args, "--peer-auto-tls=true")
  326. } else {
  327. tlsPeerArgs := []string{
  328. "--peer-cert-file", certPath,
  329. "--peer-key-file", privateKeyPath,
  330. "--peer-ca-file", caPath,
  331. }
  332. args = append(args, tlsPeerArgs...)
  333. }
  334. }
  335. return args
  336. }
  337. func (epc *etcdProcessCluster) Start() (err error) {
  338. readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
  339. for i := range epc.procs {
  340. go func(n int) { readyC <- epc.procs[n].waitReady() }(i)
  341. }
  342. for range epc.procs {
  343. if err := <-readyC; err != nil {
  344. epc.Close()
  345. return err
  346. }
  347. }
  348. return nil
  349. }
  350. func (epc *etcdProcessCluster) RestartAll() error {
  351. for i := range epc.procs {
  352. proc, err := newEtcdProcess(epc.procs[i].cfg)
  353. if err != nil {
  354. epc.Close()
  355. return err
  356. }
  357. epc.procs[i] = proc
  358. }
  359. return epc.Start()
  360. }
  361. func (epc *etcdProcessCluster) StopAll() (err error) {
  362. for _, p := range epc.procs {
  363. if p == nil {
  364. continue
  365. }
  366. if curErr := p.Stop(); curErr != nil {
  367. if err != nil {
  368. err = fmt.Errorf("%v; %v", err, curErr)
  369. } else {
  370. err = curErr
  371. }
  372. }
  373. }
  374. return err
  375. }
  376. func (epc *etcdProcessCluster) Close() error {
  377. err := epc.StopAll()
  378. for _, p := range epc.procs {
  379. // p is nil when newEtcdProcess fails in the middle
  380. // Close still gets called to clean up test data
  381. if p == nil {
  382. continue
  383. }
  384. os.RemoveAll(p.cfg.dataDirPath)
  385. }
  386. return err
  387. }
  388. func (ep *etcdProcess) Restart() error {
  389. newEp, err := newEtcdProcess(ep.cfg)
  390. if err != nil {
  391. ep.Stop()
  392. return err
  393. }
  394. *ep = *newEp
  395. if err = ep.waitReady(); err != nil {
  396. ep.Stop()
  397. return err
  398. }
  399. return nil
  400. }
  401. func (ep *etcdProcess) Stop() error {
  402. if ep == nil {
  403. return nil
  404. }
  405. if err := ep.proc.Stop(); err != nil {
  406. return err
  407. }
  408. <-ep.donec
  409. if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
  410. os.RemoveAll(ep.cfg.purl.Host)
  411. }
  412. return nil
  413. }
  414. func (ep *etcdProcess) waitReady() error {
  415. defer close(ep.donec)
  416. return waitReadyExpectProc(ep.proc, ep.cfg.isProxy)
  417. }
  418. func waitReadyExpectProc(exproc *expect.ExpectProcess, isProxy bool) error {
  419. readyStrs := []string{"enabled capabilities for version", "published"}
  420. if isProxy {
  421. readyStrs = []string{"httpproxy: endpoints found"}
  422. }
  423. c := 0
  424. matchSet := func(l string) bool {
  425. for _, s := range readyStrs {
  426. if strings.Contains(l, s) {
  427. c++
  428. break
  429. }
  430. }
  431. return c == len(readyStrs)
  432. }
  433. _, err := exproc.ExpectFunc(matchSet)
  434. return err
  435. }
  436. func spawnCmd(args []string) (*expect.ExpectProcess, error) {
  437. return expect.NewExpect(args[0], args[1:]...)
  438. }
  439. func spawnWithExpect(args []string, expected string) error {
  440. return spawnWithExpects(args, []string{expected}...)
  441. }
  442. func spawnWithExpects(args []string, xs ...string) error {
  443. proc, err := spawnCmd(args)
  444. if err != nil {
  445. return err
  446. }
  447. // process until either stdout or stderr contains
  448. // the expected string
  449. var (
  450. lines []string
  451. lineFunc = func(txt string) bool { return true }
  452. )
  453. for _, txt := range xs {
  454. for {
  455. l, err := proc.ExpectFunc(lineFunc)
  456. if err != nil {
  457. proc.Close()
  458. return fmt.Errorf("%v (expected %q, got %q)", err, txt, lines)
  459. }
  460. lines = append(lines, l)
  461. if strings.Contains(l, txt) {
  462. break
  463. }
  464. }
  465. }
  466. perr := proc.Close()
  467. if err != nil {
  468. return err
  469. }
  470. if len(xs) == 0 && proc.LineCount() != 0 { // expect no output
  471. return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
  472. }
  473. return perr
  474. }
  475. // proxies returns only the proxy etcdProcess.
  476. func (epc *etcdProcessCluster) proxies() []*etcdProcess {
  477. return epc.procs[epc.cfg.clusterSize:]
  478. }
  479. func (epc *etcdProcessCluster) processes() []*etcdProcess {
  480. return epc.procs[:epc.cfg.clusterSize]
  481. }
  482. func (epc *etcdProcessCluster) endpoints() []string {
  483. eps := make([]string, epc.cfg.clusterSize)
  484. for i, ep := range epc.processes() {
  485. eps[i] = ep.cfg.acurl
  486. }
  487. return eps
  488. }
  489. func (epc *etcdProcessCluster) grpcEndpoints() []string {
  490. eps := make([]string, epc.cfg.clusterSize)
  491. for i, ep := range epc.processes() {
  492. eps[i] = ep.cfg.acurlHost
  493. }
  494. return eps
  495. }