etcd_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "io/ioutil"
  18. "net/url"
  19. "os"
  20. "strings"
  21. "github.com/coreos/etcd/pkg/expect"
  22. "github.com/coreos/etcd/pkg/fileutil"
  23. )
  24. const (
  25. etcdProcessBasePort = 20000
  26. certPath = "../integration/fixtures/server.crt"
  27. privateKeyPath = "../integration/fixtures/server.key.insecure"
  28. caPath = "../integration/fixtures/ca.crt"
  29. )
  30. type clientConnType int
  31. const (
  32. clientNonTLS clientConnType = iota
  33. clientTLS
  34. clientTLSAndNonTLS
  35. )
  36. var (
  37. configNoTLS = etcdProcessClusterConfig{
  38. clusterSize: 3,
  39. proxySize: 0,
  40. initialToken: "new",
  41. }
  42. configAutoTLS = etcdProcessClusterConfig{
  43. clusterSize: 3,
  44. isPeerTLS: true,
  45. isPeerAutoTLS: true,
  46. initialToken: "new",
  47. }
  48. configTLS = etcdProcessClusterConfig{
  49. clusterSize: 3,
  50. proxySize: 0,
  51. clientTLS: clientTLS,
  52. isPeerTLS: true,
  53. initialToken: "new",
  54. }
  55. configClientTLS = etcdProcessClusterConfig{
  56. clusterSize: 3,
  57. proxySize: 0,
  58. clientTLS: clientTLS,
  59. initialToken: "new",
  60. }
  61. configClientBoth = etcdProcessClusterConfig{
  62. clusterSize: 1,
  63. proxySize: 0,
  64. clientTLS: clientTLSAndNonTLS,
  65. initialToken: "new",
  66. }
  67. configClientAutoTLS = etcdProcessClusterConfig{
  68. clusterSize: 1,
  69. proxySize: 0,
  70. isClientAutoTLS: true,
  71. clientTLS: clientTLS,
  72. initialToken: "new",
  73. }
  74. configPeerTLS = etcdProcessClusterConfig{
  75. clusterSize: 3,
  76. proxySize: 0,
  77. isPeerTLS: true,
  78. initialToken: "new",
  79. }
  80. configWithProxy = etcdProcessClusterConfig{
  81. clusterSize: 3,
  82. proxySize: 1,
  83. initialToken: "new",
  84. }
  85. configWithProxyTLS = etcdProcessClusterConfig{
  86. clusterSize: 3,
  87. proxySize: 1,
  88. clientTLS: clientTLS,
  89. isPeerTLS: true,
  90. initialToken: "new",
  91. }
  92. configWithProxyPeerTLS = etcdProcessClusterConfig{
  93. clusterSize: 3,
  94. proxySize: 1,
  95. isPeerTLS: true,
  96. initialToken: "new",
  97. }
  98. )
  99. func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
  100. ret := cfg
  101. ret.clusterSize = 1
  102. return &ret
  103. }
  104. type etcdProcessCluster struct {
  105. cfg *etcdProcessClusterConfig
  106. procs []*etcdProcess
  107. }
  108. type etcdProcess struct {
  109. cfg *etcdProcessConfig
  110. proc *expect.ExpectProcess
  111. donec chan struct{} // closed when Interact() terminates
  112. }
  113. type etcdProcessConfig struct {
  114. args []string
  115. dataDirPath string
  116. keepDataDir bool
  117. acurl string
  118. // additional url for tls connection when the etcd process
  119. // serves both http and https
  120. acurltls string
  121. acurlHost string
  122. isProxy bool
  123. }
  124. type etcdProcessClusterConfig struct {
  125. dataDirPath string
  126. keepDataDir bool
  127. clusterSize int
  128. basePort int
  129. proxySize int
  130. clientTLS clientConnType
  131. isPeerTLS bool
  132. isPeerAutoTLS bool
  133. isClientAutoTLS bool
  134. forceNewCluster bool
  135. initialToken string
  136. quotaBackendBytes int64
  137. }
  138. // newEtcdProcessCluster launches a new cluster from etcd processes, returning
  139. // a new etcdProcessCluster once all nodes are ready to accept client requests.
  140. func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
  141. etcdCfgs := cfg.etcdProcessConfigs()
  142. epc := &etcdProcessCluster{
  143. cfg: cfg,
  144. procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
  145. }
  146. // launch etcd processes
  147. for i := range etcdCfgs {
  148. proc, err := newEtcdProcess(etcdCfgs[i])
  149. if err != nil {
  150. epc.Close()
  151. return nil, err
  152. }
  153. epc.procs[i] = proc
  154. }
  155. return epc, epc.Start()
  156. }
  157. func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
  158. if !fileutil.Exist("../bin/etcd") {
  159. return nil, fmt.Errorf("could not find etcd binary")
  160. }
  161. if !cfg.keepDataDir {
  162. if err := os.RemoveAll(cfg.dataDirPath); err != nil {
  163. return nil, err
  164. }
  165. }
  166. child, err := spawnCmd(append([]string{"../bin/etcd"}, cfg.args...))
  167. if err != nil {
  168. return nil, err
  169. }
  170. return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
  171. }
  172. func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
  173. if cfg.basePort == 0 {
  174. cfg.basePort = etcdProcessBasePort
  175. }
  176. clientScheme := "http"
  177. if cfg.clientTLS == clientTLS {
  178. clientScheme = "https"
  179. }
  180. peerScheme := "http"
  181. if cfg.isPeerTLS {
  182. peerScheme = "https"
  183. }
  184. etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
  185. initialCluster := make([]string, cfg.clusterSize)
  186. for i := 0; i < cfg.clusterSize; i++ {
  187. var curls []string
  188. var curl, curltls string
  189. port := cfg.basePort + 2*i
  190. curlHost := fmt.Sprintf("localhost:%d", port)
  191. switch cfg.clientTLS {
  192. case clientNonTLS, clientTLS:
  193. curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
  194. curls = []string{curl}
  195. case clientTLSAndNonTLS:
  196. curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
  197. curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
  198. curls = []string{curl, curltls}
  199. }
  200. purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
  201. name := fmt.Sprintf("testname%d", i)
  202. dataDirPath := cfg.dataDirPath
  203. if cfg.dataDirPath == "" {
  204. var derr error
  205. dataDirPath, derr = ioutil.TempDir("", name+".etcd")
  206. if derr != nil {
  207. panic("could not get tempdir for datadir")
  208. }
  209. }
  210. initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
  211. args := []string{
  212. "--name", name,
  213. "--listen-client-urls", strings.Join(curls, ","),
  214. "--advertise-client-urls", strings.Join(curls, ","),
  215. "--listen-peer-urls", purl.String(),
  216. "--initial-advertise-peer-urls", purl.String(),
  217. "--initial-cluster-token", cfg.initialToken,
  218. "--data-dir", dataDirPath,
  219. }
  220. if cfg.forceNewCluster {
  221. args = append(args, "--force-new-cluster")
  222. }
  223. if cfg.quotaBackendBytes > 0 {
  224. args = append(args,
  225. "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
  226. )
  227. }
  228. args = append(args, cfg.tlsArgs()...)
  229. etcdCfgs[i] = &etcdProcessConfig{
  230. args: args,
  231. dataDirPath: dataDirPath,
  232. keepDataDir: cfg.keepDataDir,
  233. acurl: curl,
  234. acurltls: curltls,
  235. acurlHost: curlHost,
  236. }
  237. }
  238. for i := 0; i < cfg.proxySize; i++ {
  239. port := cfg.basePort + 2*cfg.clusterSize + i + 1
  240. curlHost := fmt.Sprintf("localhost:%d", port)
  241. curl := url.URL{Scheme: clientScheme, Host: curlHost}
  242. name := fmt.Sprintf("testname-proxy%d", i)
  243. dataDirPath, derr := ioutil.TempDir("", name+".etcd")
  244. if derr != nil {
  245. panic("could not get tempdir for datadir")
  246. }
  247. args := []string{
  248. "--name", name,
  249. "--proxy", "on",
  250. "--listen-client-urls", curl.String(),
  251. "--data-dir", dataDirPath,
  252. }
  253. args = append(args, cfg.tlsArgs()...)
  254. etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
  255. args: args,
  256. dataDirPath: dataDirPath,
  257. keepDataDir: cfg.keepDataDir,
  258. acurl: curl.String(),
  259. acurlHost: curlHost,
  260. isProxy: true,
  261. }
  262. }
  263. initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
  264. for i := range etcdCfgs {
  265. etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
  266. }
  267. return etcdCfgs
  268. }
  269. func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
  270. if cfg.clientTLS != clientNonTLS {
  271. if cfg.isClientAutoTLS {
  272. args = append(args, "--auto-tls=true")
  273. } else {
  274. tlsClientArgs := []string{
  275. "--cert-file", certPath,
  276. "--key-file", privateKeyPath,
  277. "--ca-file", caPath,
  278. }
  279. args = append(args, tlsClientArgs...)
  280. }
  281. }
  282. if cfg.isPeerTLS {
  283. if cfg.isPeerAutoTLS {
  284. args = append(args, "--peer-auto-tls=true")
  285. } else {
  286. tlsPeerArgs := []string{
  287. "--peer-cert-file", certPath,
  288. "--peer-key-file", privateKeyPath,
  289. "--peer-ca-file", caPath,
  290. }
  291. args = append(args, tlsPeerArgs...)
  292. }
  293. }
  294. return args
  295. }
  296. func (epc *etcdProcessCluster) Start() (err error) {
  297. readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
  298. readyStr := "enabled capabilities for version"
  299. for i := range epc.procs {
  300. go func(etcdp *etcdProcess) {
  301. etcdp.donec = make(chan struct{})
  302. rs := readyStr
  303. if etcdp.cfg.isProxy {
  304. rs = "httpproxy: endpoints found"
  305. }
  306. _, err := etcdp.proc.Expect(rs)
  307. readyC <- err
  308. close(etcdp.donec)
  309. }(epc.procs[i])
  310. }
  311. for range epc.procs {
  312. if err := <-readyC; err != nil {
  313. epc.Close()
  314. return err
  315. }
  316. }
  317. return nil
  318. }
  319. func (epc *etcdProcessCluster) Restart() error {
  320. for i := range epc.procs {
  321. proc, err := newEtcdProcess(epc.procs[i].cfg)
  322. if err != nil {
  323. epc.Close()
  324. return err
  325. }
  326. epc.procs[i] = proc
  327. }
  328. return epc.Start()
  329. }
  330. func (epc *etcdProcessCluster) Stop() (err error) {
  331. for _, p := range epc.procs {
  332. if p == nil {
  333. continue
  334. }
  335. if curErr := p.proc.Stop(); curErr != nil {
  336. if err != nil {
  337. err = fmt.Errorf("%v; %v", err, curErr)
  338. } else {
  339. err = curErr
  340. }
  341. }
  342. <-p.donec
  343. }
  344. return err
  345. }
  346. func (epc *etcdProcessCluster) Close() error {
  347. err := epc.Stop()
  348. for _, p := range epc.procs {
  349. os.RemoveAll(p.cfg.dataDirPath)
  350. }
  351. return err
  352. }
  353. func spawnCmd(args []string) (*expect.ExpectProcess, error) {
  354. return expect.NewExpect(args[0], args[1:]...)
  355. }
  356. func spawnWithExpect(args []string, expected string) error {
  357. return spawnWithExpects(args, []string{expected}...)
  358. }
  359. func spawnWithExpects(args []string, xs ...string) error {
  360. proc, err := spawnCmd(args)
  361. if err != nil {
  362. return err
  363. }
  364. // process until either stdout or stderr contains
  365. // the expected string
  366. var (
  367. lines []string
  368. lineFunc = func(txt string) bool { return true }
  369. )
  370. for _, txt := range xs {
  371. for {
  372. l, err := proc.ExpectFunc(lineFunc)
  373. if err != nil {
  374. return fmt.Errorf("%v (expected %q, got %q)", err, txt, lines)
  375. }
  376. lines = append(lines, l)
  377. if strings.Contains(l, txt) {
  378. break
  379. }
  380. }
  381. }
  382. perr := proc.Close()
  383. if err != nil {
  384. return err
  385. }
  386. if len(xs) == 0 && proc.LineCount() != 0 { // expect no output
  387. return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
  388. }
  389. return perr
  390. }
  391. // proxies returns only the proxy etcdProcess.
  392. func (epc *etcdProcessCluster) proxies() []*etcdProcess {
  393. return epc.procs[epc.cfg.clusterSize:]
  394. }
  395. func (epc *etcdProcessCluster) backends() []*etcdProcess {
  396. return epc.procs[:epc.cfg.clusterSize]
  397. }
  398. func (epc *etcdProcessCluster) endpoints() []string {
  399. eps := make([]string, epc.cfg.clusterSize)
  400. for i, ep := range epc.backends() {
  401. eps[i] = ep.cfg.acurl
  402. }
  403. return eps
  404. }
  405. func (epc *etcdProcessCluster) grpcEndpoints() []string {
  406. eps := make([]string, epc.cfg.clusterSize)
  407. for i, ep := range epc.backends() {
  408. eps[i] = ep.cfg.acurlHost
  409. }
  410. return eps
  411. }