etcdctlv3_test.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package e2e
  15. import (
  16. "fmt"
  17. "os"
  18. "strconv"
  19. "strings"
  20. "testing"
  21. "time"
  22. "github.com/coreos/etcd/pkg/testutil"
  23. )
  24. func TestCtlV3Put(t *testing.T) { testCtl(t, putTest) }
  25. func TestCtlV3PutTimeout(t *testing.T) { testCtl(t, putTest, withDialTimeout(0)) }
  26. func TestCtlV3PutTimeoutQuorum(t *testing.T) { testCtl(t, putTest, withDialTimeout(0), withQuorum()) }
  27. func TestCtlV3PutAutoTLS(t *testing.T) { testCtl(t, putTest, withCfg(configAutoTLS)) }
  28. func TestCtlV3PutPeerTLS(t *testing.T) { testCtl(t, putTest, withCfg(configPeerTLS)) }
  29. func TestCtlV3PutClientTLS(t *testing.T) { testCtl(t, putTest, withCfg(configClientTLS)) }
  30. func TestCtlV3Watch(t *testing.T) { testCtl(t, watchTest) }
  31. func TestCtlV3WatchAutoTLS(t *testing.T) { testCtl(t, watchTest, withCfg(configAutoTLS)) }
  32. func TestCtlV3WatchPeerTLS(t *testing.T) { testCtl(t, watchTest, withCfg(configPeerTLS)) }
  33. // TODO: Watch with client TLS is not working
  34. // func TestCtlV3WatchClientTLS(t *testing.T) {
  35. // testCtl(t, watchTest, withCfg(configClientTLS))
  36. // }
  37. func TestCtlV3WatchInteractive(t *testing.T) { testCtl(t, watchTest, withInteractive()) }
  38. func TestCtlV3WatchInteractiveAutoTLS(t *testing.T) {
  39. testCtl(t, watchTest, withInteractive(), withCfg(configAutoTLS))
  40. }
  41. func TestCtlV3WatchInteractivePeerTLS(t *testing.T) {
  42. testCtl(t, watchTest, withInteractive(), withCfg(configPeerTLS))
  43. }
  44. // TODO: Watch with client TLS is not working
  45. // func TestCtlV3WatchInteractiveClientTLS(t *testing.T) {
  46. // testCtl(t, watchTest, withInteractive(), withCfg(configClientTLS))
  47. // }
  48. type ctlCtx struct {
  49. t *testing.T
  50. cfg etcdProcessClusterConfig
  51. epc *etcdProcessCluster
  52. errc chan error
  53. dialTimeout time.Duration
  54. quorum bool
  55. interactive bool
  56. watchRevision int
  57. }
  58. type ctlOption func(*ctlCtx)
  59. func (cx *ctlCtx) applyOpts(opts []ctlOption) {
  60. for _, opt := range opts {
  61. opt(cx)
  62. }
  63. }
  64. func withCfg(cfg etcdProcessClusterConfig) ctlOption {
  65. return func(cx *ctlCtx) { cx.cfg = cfg }
  66. }
  67. func withDialTimeout(timeout time.Duration) ctlOption {
  68. return func(cx *ctlCtx) { cx.dialTimeout = timeout }
  69. }
  70. func withQuorum() ctlOption {
  71. return func(cx *ctlCtx) { cx.quorum = true }
  72. }
  73. func withInteractive() ctlOption {
  74. return func(cx *ctlCtx) { cx.interactive = true }
  75. }
  76. func withWatchRevision(rev int) ctlOption {
  77. return func(cx *ctlCtx) { cx.watchRevision = rev }
  78. }
  79. func setupCtlV3Test(t *testing.T, cfg etcdProcessClusterConfig, quorum bool) *etcdProcessCluster {
  80. mustEtcdctl(t)
  81. if !quorum {
  82. cfg = *configStandalone(cfg)
  83. }
  84. epc, err := newEtcdProcessCluster(&cfg)
  85. if err != nil {
  86. t.Fatalf("could not start etcd process cluster (%v)", err)
  87. }
  88. return epc
  89. }
  90. func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
  91. defer testutil.AfterTest(t)
  92. var (
  93. defaultDialTimeout = 7 * time.Second
  94. defaultWatchRevision = 1
  95. )
  96. ret := ctlCtx{
  97. t: t,
  98. cfg: configNoTLS,
  99. errc: make(chan error, 1),
  100. dialTimeout: defaultDialTimeout,
  101. watchRevision: defaultWatchRevision,
  102. }
  103. ret.applyOpts(opts)
  104. os.Setenv("ETCDCTL_API", "3")
  105. ret.epc = setupCtlV3Test(ret.t, ret.cfg, ret.quorum)
  106. defer func() {
  107. os.Unsetenv("ETCDCTL_API")
  108. if errC := ret.epc.Close(); errC != nil {
  109. t.Fatalf("error closing etcd processes (%v)", errC)
  110. }
  111. }()
  112. go testFunc(ret)
  113. select {
  114. case <-time.After(2*ret.dialTimeout + time.Second):
  115. if ret.dialTimeout > 0 {
  116. t.Fatalf("test timed out for %v", ret.dialTimeout)
  117. }
  118. case err := <-ret.errc:
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. }
  123. return
  124. }
  125. func putTest(cx ctlCtx) {
  126. key, value := "foo", "bar"
  127. defer close(cx.errc)
  128. if err := ctlV3Put(cx, key, value); err != nil {
  129. if cx.dialTimeout > 0 && isGRPCTimedout(err) {
  130. cx.errc <- fmt.Errorf("put error (%v)", err)
  131. return
  132. }
  133. }
  134. if err := ctlV3Get(cx, key, value); err != nil {
  135. if cx.dialTimeout > 0 && isGRPCTimedout(err) {
  136. cx.errc <- fmt.Errorf("get error (%v)", err)
  137. return
  138. }
  139. }
  140. }
  141. func watchTest(cx ctlCtx) {
  142. key, value := "foo", "bar"
  143. defer close(cx.errc)
  144. go func() {
  145. if err := ctlV3Put(cx, key, value); err != nil {
  146. cx.t.Fatal(err)
  147. }
  148. }()
  149. if err := ctlV3Watch(cx, key, value); err != nil {
  150. if cx.dialTimeout > 0 && isGRPCTimedout(err) {
  151. cx.errc <- fmt.Errorf("watch error (%v)", err)
  152. return
  153. }
  154. }
  155. }
  156. func ctlV3PrefixArgs(clus *etcdProcessCluster, dialTimeout time.Duration) []string {
  157. if len(clus.proxies()) > 0 { // TODO: add proxy check as in v2
  158. panic("v3 proxy not implemented")
  159. }
  160. endpoints := ""
  161. if backends := clus.backends(); len(backends) != 0 {
  162. es := []string{}
  163. for _, b := range backends {
  164. es = append(es, stripSchema(b.cfg.acurl))
  165. }
  166. endpoints = strings.Join(es, ",")
  167. }
  168. cmdArgs := []string{"../bin/etcdctl", "--endpoints", endpoints, "--dial-timeout", dialTimeout.String()}
  169. if clus.cfg.clientTLS == clientTLS {
  170. cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
  171. }
  172. return cmdArgs
  173. }
  174. func ctlV3Put(cx ctlCtx, key, value string) error {
  175. cmdArgs := append(ctlV3PrefixArgs(cx.epc, cx.dialTimeout), "put", key, value)
  176. return spawnWithExpect(cmdArgs, "OK")
  177. }
  178. func ctlV3Get(cx ctlCtx, key, value string) error {
  179. cmdArgs := append(ctlV3PrefixArgs(cx.epc, cx.dialTimeout), "get", key)
  180. if !cx.quorum {
  181. cmdArgs = append(cmdArgs, "--consistency", "s")
  182. }
  183. return spawnWithExpects(cmdArgs, key, value)
  184. }
  185. func ctlV3Watch(cx ctlCtx, key, value string) error {
  186. cmdArgs := append(ctlV3PrefixArgs(cx.epc, cx.dialTimeout), "watch")
  187. if !cx.interactive {
  188. if cx.watchRevision > 0 {
  189. cmdArgs = append(cmdArgs, "--rev", strconv.Itoa(cx.watchRevision))
  190. }
  191. cmdArgs = append(cmdArgs, key)
  192. return spawnWithExpects(cmdArgs, key, value)
  193. }
  194. cmdArgs = append(cmdArgs, "--interactive")
  195. proc, err := spawnCmd(cmdArgs)
  196. if err != nil {
  197. return err
  198. }
  199. watchLine := fmt.Sprintf("watch %s", key)
  200. if cx.watchRevision > 0 {
  201. watchLine = fmt.Sprintf("watch %s --rev %d", key, cx.watchRevision)
  202. }
  203. if err = proc.SendLine(watchLine); err != nil {
  204. return err
  205. }
  206. _, err = proc.Expect(key)
  207. if err != nil {
  208. return err
  209. }
  210. _, err = proc.Expect(value)
  211. if err != nil {
  212. return err
  213. }
  214. return proc.Close()
  215. }
  216. func isGRPCTimedout(err error) bool {
  217. return strings.Contains(err.Error(), "grpc: timed out trying to connect")
  218. }
  219. func stripSchema(s string) string {
  220. if strings.HasPrefix(s, "http://") {
  221. s = strings.Replace(s, "http://", "", -1)
  222. }
  223. if strings.HasPrefix(s, "https://") {
  224. s = strings.Replace(s, "https://", "", -1)
  225. }
  226. return s
  227. }