connectionpool_systems_test.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // +build conn_pool
  5. package gocql
  6. import (
  7. "flag"
  8. "fmt"
  9. "log"
  10. "os/exec"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "testing"
  15. "time"
  16. )
  17. // connection pool behavior test when nodes are removed from the cluster
  18. // to run this test, see connectionpool_systems_test.sh
  19. var (
  20. flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
  21. flagProto = flag.Int("proto", 2, "protcol version")
  22. flagCQL = flag.String("cql", "3.0.0", "CQL version")
  23. flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
  24. clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
  25. nodesShut = flag.Int("nodesShut", 1, "the number of nodes to shutdown during the test")
  26. flagRetry = flag.Int("retries", 5, "number of times to retry queries")
  27. flagRunSsl = flag.Bool("runssl", false, "Set to true to run ssl test")
  28. clusterHosts []string
  29. )
  30. var initOnce sync.Once
  31. func init() {
  32. flag.Parse()
  33. clusterHosts = strings.Split(*flagCluster, ",")
  34. log.SetFlags(log.Lshortfile | log.LstdFlags)
  35. }
  36. func createTable(s *Session, table string) error {
  37. err := s.Query(table).Consistency(All).Exec()
  38. if *clusterSize > 1 {
  39. // wait for table definition to propogate
  40. time.Sleep(250 * time.Millisecond)
  41. }
  42. return err
  43. }
  44. func createCluster() *ClusterConfig {
  45. cluster := NewCluster(clusterHosts...)
  46. cluster.ProtoVersion = *flagProto
  47. cluster.CQLVersion = *flagCQL
  48. cluster.Timeout = 5 * time.Second
  49. cluster.Consistency = Quorum
  50. if *flagRetry > 0 {
  51. cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
  52. }
  53. if *flagRunSsl {
  54. cluster.SslOpts = &SslOptions{
  55. CertPath: "testdata/pki/gocql.crt",
  56. KeyPath: "testdata/pki/gocql.key",
  57. CaPath: "testdata/pki/ca.crt",
  58. EnableHostVerification: false,
  59. }
  60. }
  61. return cluster
  62. }
  63. func createKeyspace(t testing.T, cluster *ClusterConfig, keyspace string) {
  64. session, err := cluster.CreateSession()
  65. if err != nil {
  66. t.Fatal("createSession:", err)
  67. }
  68. if err = session.Query(`DROP KEYSPACE ` + keyspace).Exec(); err != nil {
  69. t.Log("drop keyspace:", err)
  70. }
  71. err = session.Query(
  72. fmt.Sprintf(
  73. `
  74. CREATE KEYSPACE %s
  75. WITH replication = {
  76. 'class' : 'SimpleStrategy',
  77. 'replication_factor' : %d
  78. }
  79. `,
  80. keyspace,
  81. *flagRF,
  82. ),
  83. ).Consistency(All).Exec()
  84. if err != nil {
  85. t.Fatalf("error creating keyspace %s: %v", keyspace, err)
  86. }
  87. t.Logf("Created keyspace %s", keyspace)
  88. session.Close()
  89. }
  90. func createSession(t testing.T) *Session {
  91. cluster := createCluster()
  92. // Drop and re-create the keyspace once. Different tests should use their own
  93. // individual tables, but can assume that the table does not exist before.
  94. initOnce.Do(func() {
  95. createKeyspace(t, cluster, "gocql_test")
  96. })
  97. cluster.Keyspace = "gocql_test"
  98. session, err := cluster.CreateSession()
  99. if err != nil {
  100. t.Fatal("createSession:", err)
  101. }
  102. return session
  103. }
  104. func TestSimplePool(t *testing.T) {
  105. testConnPool(t, NewSimplePool)
  106. }
  107. func TestRRPolicyConnPool(t *testing.T) {
  108. testConnPool(t, NewRoundRobinConnPool)
  109. }
  110. func TestTAPolicyConnPool(t *testing.T) {
  111. testConnPool(t, NewTokenAwareConnPool)
  112. }
  113. func testConnPool(t *testing.T, connPoolType NewPoolFunc) {
  114. var out []byte
  115. var err error
  116. log.SetFlags(log.Ltime)
  117. // make sure the cluster is running
  118. out, err = exec.Command("ccm", "start").CombinedOutput()
  119. if err != nil {
  120. t.Fatalf("Error running ccm command: %v", err)
  121. fmt.Printf("ccm output:\n%s", string(out))
  122. }
  123. time.Sleep(time.Duration(*clusterSize) * 1000 * time.Millisecond)
  124. // fire up a session (no discovery)
  125. cluster := createCluster()
  126. cluster.ConnPoolType = connPoolType
  127. cluster.DiscoverHosts = false
  128. session, err := cluster.CreateSession()
  129. if err != nil {
  130. t.Fatalf("Error connecting to cluster: %v", err)
  131. }
  132. defer session.Close()
  133. time.Sleep(time.Duration(*clusterSize) * 1000 * time.Millisecond)
  134. if session.Pool.Size() != (*clusterSize)*cluster.NumConns {
  135. t.Logf(
  136. "WARN: Expected %d pool size, but was %d",
  137. (*clusterSize)*cluster.NumConns,
  138. session.Pool.Size(),
  139. )
  140. }
  141. // start some connection monitoring
  142. nilCheckStop := false
  143. nilCount := 0
  144. nilCheck := func() {
  145. // assert that all connections returned by the pool are non-nil
  146. for !nilCheckStop {
  147. actual := session.Pool.Pick(nil)
  148. if actual == nil {
  149. nilCount++
  150. }
  151. }
  152. }
  153. go nilCheck()
  154. // shutdown some hosts
  155. log.Println("shutdown some hosts")
  156. for i := 0; i < *nodesShut; i++ {
  157. out, err = exec.Command("ccm", "node"+strconv.Itoa(i+1), "stop").CombinedOutput()
  158. if err != nil {
  159. t.Fatalf("Error running ccm command: %v", err)
  160. fmt.Printf("ccm output:\n%s", string(out))
  161. }
  162. time.Sleep(1500 * time.Millisecond)
  163. }
  164. time.Sleep(500 * time.Millisecond)
  165. if session.Pool.Size() != ((*clusterSize)-(*nodesShut))*cluster.NumConns {
  166. t.Logf(
  167. "WARN: Expected %d pool size, but was %d",
  168. ((*clusterSize)-(*nodesShut))*cluster.NumConns,
  169. session.Pool.Size(),
  170. )
  171. }
  172. // bringup the shutdown hosts
  173. log.Println("bringup the shutdown hosts")
  174. for i := 0; i < *nodesShut; i++ {
  175. out, err = exec.Command("ccm", "node"+strconv.Itoa(i+1), "start").CombinedOutput()
  176. if err != nil {
  177. t.Fatalf("Error running ccm command: %v", err)
  178. fmt.Printf("ccm output:\n%s", string(out))
  179. }
  180. time.Sleep(1500 * time.Millisecond)
  181. }
  182. time.Sleep(500 * time.Millisecond)
  183. if session.Pool.Size() != (*clusterSize)*cluster.NumConns {
  184. t.Logf(
  185. "WARN: Expected %d pool size, but was %d",
  186. (*clusterSize)*cluster.NumConns,
  187. session.Pool.Size(),
  188. )
  189. }
  190. // assert that all connections returned by the pool are non-nil
  191. if nilCount > 0 {
  192. t.Errorf("%d nil connections returned from %T", nilCount, session.Pool)
  193. }
  194. // shutdown cluster
  195. log.Println("shutdown cluster")
  196. out, err = exec.Command("ccm", "stop").CombinedOutput()
  197. if err != nil {
  198. t.Fatalf("Error running ccm command: %v", err)
  199. fmt.Printf("ccm output:\n%s", string(out))
  200. }
  201. time.Sleep(2500 * time.Millisecond)
  202. if session.Pool.Size() != 0 {
  203. t.Logf(
  204. "WARN: Expected %d pool size, but was %d",
  205. 0,
  206. session.Pool.Size(),
  207. )
  208. }
  209. // start cluster
  210. log.Println("start cluster")
  211. out, err = exec.Command("ccm", "start").CombinedOutput()
  212. if err != nil {
  213. t.Fatalf("Error running ccm command: %v", err)
  214. fmt.Printf("ccm output:\n%s", string(out))
  215. }
  216. time.Sleep(500 * time.Millisecond)
  217. // reset the count
  218. nilCount = 0
  219. time.Sleep(3000 * time.Millisecond)
  220. if session.Pool.Size() != (*clusterSize)*cluster.NumConns {
  221. t.Logf(
  222. "WARN: Expected %d pool size, but was %d",
  223. (*clusterSize)*cluster.NumConns,
  224. session.Pool.Size(),
  225. )
  226. }
  227. // assert that all connections returned by the pool are non-nil
  228. if nilCount > 0 {
  229. t.Errorf("%d nil connections returned from %T", nilCount, session.Pool)
  230. }
  231. nilCheckStop = true
  232. }