etcd_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/etcdserver"
  22. "github.com/coreos/etcd/pkg/expect"
  23. "github.com/coreos/etcd/pkg/fileutil"
  24. )
  25. const etcdProcessBasePort = 20000
  26. var (
  27. binPath string
  28. ctlBinPath string
  29. certPath string
  30. privateKeyPath string
  31. caPath string
  32. )
  33. type clientConnType int
  34. const (
  35. clientNonTLS clientConnType = iota
  36. clientTLS
  37. clientTLSAndNonTLS
  38. )
  39. var (
  40. configNoTLS = etcdProcessClusterConfig{
  41. clusterSize: 3,
  42. proxySize: 0,
  43. initialToken: "new",
  44. }
  45. configAutoTLS = etcdProcessClusterConfig{
  46. clusterSize: 3,
  47. isPeerTLS: true,
  48. isPeerAutoTLS: true,
  49. initialToken: "new",
  50. }
  51. configTLS = etcdProcessClusterConfig{
  52. clusterSize: 3,
  53. proxySize: 0,
  54. clientTLS: clientTLS,
  55. isPeerTLS: true,
  56. initialToken: "new",
  57. }
  58. configClientTLS = etcdProcessClusterConfig{
  59. clusterSize: 3,
  60. proxySize: 0,
  61. clientTLS: clientTLS,
  62. initialToken: "new",
  63. }
  64. configClientBoth = etcdProcessClusterConfig{
  65. clusterSize: 1,
  66. proxySize: 0,
  67. clientTLS: clientTLSAndNonTLS,
  68. initialToken: "new",
  69. }
  70. configClientAutoTLS = etcdProcessClusterConfig{
  71. clusterSize: 1,
  72. proxySize: 0,
  73. isClientAutoTLS: true,
  74. clientTLS: clientTLS,
  75. initialToken: "new",
  76. }
  77. configPeerTLS = etcdProcessClusterConfig{
  78. clusterSize: 3,
  79. proxySize: 0,
  80. isPeerTLS: true,
  81. initialToken: "new",
  82. }
  83. configWithProxy = etcdProcessClusterConfig{
  84. clusterSize: 3,
  85. proxySize: 1,
  86. initialToken: "new",
  87. }
  88. configWithProxyTLS = etcdProcessClusterConfig{
  89. clusterSize: 3,
  90. proxySize: 1,
  91. clientTLS: clientTLS,
  92. isPeerTLS: true,
  93. initialToken: "new",
  94. }
  95. configWithProxyPeerTLS = etcdProcessClusterConfig{
  96. clusterSize: 3,
  97. proxySize: 1,
  98. isPeerTLS: true,
  99. initialToken: "new",
  100. }
  101. configClientTLSCertAuth = etcdProcessClusterConfig{
  102. clusterSize: 1,
  103. proxySize: 0,
  104. clientTLS: clientTLS,
  105. initialToken: "new",
  106. clientCertAuthEnabled: true,
  107. }
  108. )
  109. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  110. ret := cfg
  111. ret.clusterSize = 1
  112. return &ret
  113. }
  114. type etcdProcessCluster struct {
  115. cfg *etcdProcessClusterConfig
  116. procs []*etcdProcess
  117. }
  118. type etcdProcess struct {
  119. cfg *etcdProcessConfig
  120. proc *expect.ExpectProcess
  121. donec chan struct{} // closed when Interact() terminates
  122. }
  123. type etcdProcessConfig struct {
  124. execPath string
  125. args []string
  126. dataDirPath string
  127. keepDataDir bool
  128. name string
  129. purl url.URL
  130. acurl string
  131. // additional url for tls connection when the etcd process
  132. // serves both http and https
  133. acurltls string
  134. acurlHost string
  135. initialToken string
  136. initialCluster string
  137. isProxy bool
  138. }
  139. type etcdProcessClusterConfig struct {
  140. execPath string
  141. dataDirPath string
  142. keepDataDir bool
  143. clusterSize int
  144. baseScheme string
  145. basePort int
  146. proxySize int
  147. snapCount int // default is 10000
  148. clientTLS clientConnType
  149. clientCertAuthEnabled bool
  150. isPeerTLS bool
  151. isPeerAutoTLS bool
  152. isClientAutoTLS bool
  153. forceNewCluster bool
  154. initialToken string
  155. quotaBackendBytes int64
  156. noStrictReconfig bool
  157. }
  158. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  159. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  160. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  161. etcdCfgs := cfg.etcdProcessConfigs()
  162. epc := &etcdProcessCluster{
  163. cfg: cfg,
  164. procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
  165. }
  166. // launch etcd processes
  167. for i := range etcdCfgs {
  168. proc, err := newEtcdProcess(etcdCfgs[i])
  169. if err != nil {
  170. epc.Close()
  171. return nil, err
  172. }
  173. epc.procs[i] = proc
  174. }
  175. return epc, epc.Start()
  176. }
  177. func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
  178. if !fileutil.Exist(cfg.execPath) {
  179. return nil, fmt.Errorf("could not find etcd binary")
  180. }
  181. if !cfg.keepDataDir {
  182. if err := os.RemoveAll(cfg.dataDirPath); err != nil {
  183. return nil, err
  184. }
  185. }
  186. child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...))
  187. if err != nil {
  188. return nil, err
  189. }
  190. return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
  191. }
  192. func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
  193. binPath = binDir + "/etcd"
  194. ctlBinPath = binDir + "/etcdctl"
  195. certPath = certDir + "/server.crt"
  196. privateKeyPath = certDir + "/server.key.insecure"
  197. caPath = certDir + "/ca.crt"
  198. if cfg.basePort == 0 {
  199. cfg.basePort = etcdProcessBasePort
  200. }
  201. if cfg.execPath == "" {
  202. cfg.execPath = binPath
  203. }
  204. if cfg.snapCount == 0 {
  205. cfg.snapCount = etcdserver.DefaultSnapCount
  206. }
  207. clientScheme := "http"
  208. if cfg.clientTLS == clientTLS {
  209. clientScheme = "https"
  210. }
  211. peerScheme := cfg.baseScheme
  212. if peerScheme == "" {
  213. peerScheme = "http"
  214. }
  215. if cfg.isPeerTLS {
  216. peerScheme += "s"
  217. }
  218. etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
  219. initialCluster := make([]string, cfg.clusterSize)
  220. for i := 0; i < cfg.clusterSize; i++ {
  221. var curls []string
  222. var curl, curltls string
  223. port := cfg.basePort + 2*i
  224. curlHost := fmt.Sprintf("localhost:%d", port)
  225. switch cfg.clientTLS {
  226. case clientNonTLS, clientTLS:
  227. curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
  228. curls = []string{curl}
  229. case clientTLSAndNonTLS:
  230. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  231. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  232. curls = []string{curl, curltls}
  233. }
  234. purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
  235. name := fmt.Sprintf("testname%d", i)
  236. dataDirPath := cfg.dataDirPath
  237. if cfg.dataDirPath == "" {
  238. var derr error
  239. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  240. if derr != nil {
  241. panic("could not get tempdir for datadir")
  242. }
  243. }
  244. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  245. args := []string{
  246. "--name", name,
  247. "--listen-client-urls", strings.Join(curls, ","),
  248. "--advertise-client-urls", strings.Join(curls, ","),
  249. "--listen-peer-urls", purl.String(),
  250. "--initial-advertise-peer-urls", purl.String(),
  251. "--initial-cluster-token", cfg.initialToken,
  252. "--data-dir", dataDirPath,
  253. "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
  254. }
  255. if cfg.forceNewCluster {
  256. args = append(args, "--force-new-cluster")
  257. }
  258. if cfg.quotaBackendBytes > 0 {
  259. args = append(args,
  260. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  261. )
  262. }
  263. if cfg.noStrictReconfig {
  264. args = append(args, "--strict-reconfig-check=false")
  265. }
  266. args = append(args, cfg.tlsArgs()...)
  267. etcdCfgs[i] = &etcdProcessConfig{
  268. execPath: cfg.execPath,
  269. args: args,
  270. dataDirPath: dataDirPath,
  271. keepDataDir: cfg.keepDataDir,
  272. name: name,
  273. purl: purl,
  274. acurl: curl,
  275. acurltls: curltls,
  276. acurlHost: curlHost,
  277. initialToken: cfg.initialToken,
  278. }
  279. }
  280. for i := 0; i < cfg.proxySize; i++ {
  281. port := cfg.basePort + 2*cfg.clusterSize + i + 1
  282. curlHost := fmt.Sprintf("localhost:%d", port)
  283. curl := url.URL{Scheme: clientScheme, Host: curlHost}
  284. name := fmt.Sprintf("testname-proxy%d", i)
  285. dataDirPath, derr := ioutil.TempDir("", name+".etcd")
  286. if derr != nil {
  287. panic("could not get tempdir for datadir")
  288. }
  289. args := []string{
  290. "--name", name,
  291. "--proxy", "on",
  292. "--listen-client-urls", curl.String(),
  293. "--data-dir", dataDirPath,
  294. }
  295. args = append(args, cfg.tlsArgs()...)
  296. etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
  297. execPath: cfg.execPath,
  298. args: args,
  299. dataDirPath: dataDirPath,
  300. keepDataDir: cfg.keepDataDir,
  301. name: name,
  302. acurl: curl.String(),
  303. acurlHost: curlHost,
  304. isProxy: true,
  305. }
  306. }
  307. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  308. for i := range etcdCfgs {
  309. etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
  310. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  311. }
  312. return etcdCfgs
  313. }
  314. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  315. if cfg.clientTLS != clientNonTLS {
  316. if cfg.isClientAutoTLS {
  317. args = append(args, "--auto-tls=true")
  318. } else {
  319. tlsClientArgs := []string{
  320. "--cert-file", certPath,
  321. "--key-file", privateKeyPath,
  322. "--ca-file", caPath,
  323. }
  324. args = append(args, tlsClientArgs...)
  325. if cfg.clientCertAuthEnabled {
  326. args = append(args, "--client-cert-auth")
  327. }
  328. }
  329. }
  330. if cfg.isPeerTLS {
  331. if cfg.isPeerAutoTLS {
  332. args = append(args, "--peer-auto-tls=true")
  333. } else {
  334. tlsPeerArgs := []string{
  335. "--peer-cert-file", certPath,
  336. "--peer-key-file", privateKeyPath,
  337. "--peer-ca-file", caPath,
  338. }
  339. args = append(args, tlsPeerArgs...)
  340. }
  341. }
  342. return args
  343. }
  344. func (epc *etcdProcessCluster) Start() (err error) {
  345. readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
  346. for i := range epc.procs {
  347. go func(n int) { readyC <- epc.procs[n].waitReady() }(i)
  348. }
  349. for range epc.procs {
  350. if err := <-readyC; err != nil {
  351. epc.Close()
  352. return err
  353. }
  354. }
  355. return nil
  356. }
  357. func (epc *etcdProcessCluster) RestartAll() error {
  358. for i := range epc.procs {
  359. proc, err := newEtcdProcess(epc.procs[i].cfg)
  360. if err != nil {
  361. epc.Close()
  362. return err
  363. }
  364. epc.procs[i] = proc
  365. }
  366. return epc.Start()
  367. }
  368. func (epc *etcdProcessCluster) StopAll() (err error) {
  369. for _, p := range epc.procs {
  370. if p == nil {
  371. continue
  372. }
  373. if curErr := p.Stop(); curErr != nil {
  374. if err != nil {
  375. err = fmt.Errorf("%v; %v", err, curErr)
  376. } else {
  377. err = curErr
  378. }
  379. }
  380. }
  381. return err
  382. }
  383. func (epc *etcdProcessCluster) Close() error {
  384. err := epc.StopAll()
  385. for _, p := range epc.procs {
  386. // p is nil when newEtcdProcess fails in the middle
  387. // Close still gets called to clean up test data
  388. if p == nil {
  389. continue
  390. }
  391. os.RemoveAll(p.cfg.dataDirPath)
  392. }
  393. return err
  394. }
  395. func (ep *etcdProcess) Restart() error {
  396. newEp, err := newEtcdProcess(ep.cfg)
  397. if err != nil {
  398. ep.Stop()
  399. return err
  400. }
  401. *ep = *newEp
  402. if err = ep.waitReady(); err != nil {
  403. ep.Stop()
  404. return err
  405. }
  406. return nil
  407. }
  408. func (ep *etcdProcess) Stop() error {
  409. if ep == nil {
  410. return nil
  411. }
  412. if err := ep.proc.Stop(); err != nil {
  413. return err
  414. }
  415. <-ep.donec
  416. if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
  417. os.RemoveAll(ep.cfg.purl.Host)
  418. }
  419. return nil
  420. }
  421. func (ep *etcdProcess) waitReady() error {
  422. defer close(ep.donec)
  423. return waitReadyExpectProc(ep.proc, ep.cfg.isProxy)
  424. }
  425. func waitReadyExpectProc(exproc *expect.ExpectProcess, isProxy bool) error {
  426. readyStrs := []string{"enabled capabilities for version", "published"}
  427. if isProxy {
  428. readyStrs = []string{"httpproxy: endpoints found"}
  429. }
  430. c := 0
  431. matchSet := func(l string) bool {
  432. for _, s := range readyStrs {
  433. if strings.Contains(l, s) {
  434. c++
  435. break
  436. }
  437. }
  438. return c == len(readyStrs)
  439. }
  440. _, err := exproc.ExpectFunc(matchSet)
  441. return err
  442. }
  443. func spawnWithExpect(args []string, expected string) error {
  444. return spawnWithExpects(args, []string{expected}...)
  445. }
  446. func spawnWithExpects(args []string, xs ...string) error {
  447. proc, err := spawnCmd(args)
  448. if err != nil {
  449. return err
  450. }
  451. // process until either stdout or stderr contains
  452. // the expected string
  453. var (
  454. lines []string
  455. lineFunc = func(txt string) bool { return true }
  456. )
  457. for _, txt := range xs {
  458. for {
  459. l, lerr := proc.ExpectFunc(lineFunc)
  460. if lerr != nil {
  461. proc.Close()
  462. return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
  463. }
  464. lines = append(lines, l)
  465. if strings.Contains(l, txt) {
  466. break
  467. }
  468. }
  469. }
  470. perr := proc.Close()
  471. if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output
  472. return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
  473. }
  474. return perr
  475. }
  476. // proxies returns only the proxy etcdProcess.
  477. func (epc *etcdProcessCluster) proxies() []*etcdProcess {
  478. return epc.procs[epc.cfg.clusterSize:]
  479. }
  480. func (epc *etcdProcessCluster) processes() []*etcdProcess {
  481. return epc.procs[:epc.cfg.clusterSize]
  482. }
  483. func (epc *etcdProcessCluster) endpoints() []string {
  484. eps := make([]string, epc.cfg.clusterSize)
  485. for i, ep := range epc.processes() {
  486. eps[i] = ep.cfg.acurl
  487. }
  488. return eps
  489. }
  490. func (epc *etcdProcessCluster) grpcEndpoints() []string {
  491. eps := make([]string, epc.cfg.clusterSize)
  492. for i, ep := range epc.processes() {
  493. eps[i] = ep.cfg.acurlHost
  494. }
  495. return eps
  496. }